You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/23 11:15:49 UTC
svn commit: r1560618 [3/5] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/bin/ qpid/cpp/
qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.receiver/
qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.sender/
qpid/cpp/bindings/...
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jan 23 10:15:46 2014
@@ -104,8 +104,6 @@ Primary::Primary(HaBroker& hb, const Bro
QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
- broker::QueueRegistry& queues = hb.getBroker().getQueues();
- queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
if (expect.empty()) {
QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups.");
}
@@ -140,15 +138,6 @@ Primary::~Primary() {
haBroker.getObserver()->reset();
}
-void Primary::initializeQueue(boost::shared_ptr<broker::Queue> q) {
- if (replicationTest.useLevel(*q) == ALL) {
- boost::shared_ptr<QueueReplicator> qr = haBroker.findQueueReplicator(q->getName());
- ReplicationId firstId = qr ? qr->getMaxId()+1 : ReplicationId(1);
- q->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(q->getName(), firstId)));
- }
-}
-
void Primary::checkReady() {
bool activate = false;
{
@@ -261,7 +250,6 @@ void Primary::queueCreate(const QueuePtr
if (level) {
QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
<< " replication: " << printable(level));
- initializeQueue(q);
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
@@ -348,6 +336,7 @@ void Primary::opened(broker::Connection&
} else {
QPID_LOG(info, logPrefix << "Known backup reconnection: " << info);
i->second->setConnection(&connection);
+ backup = i->second;
}
if (info.getStatus() == JOINING) {
info.setStatus(CATCHUP);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/Primary.h Thu Jan 23 10:15:46 2014
@@ -125,7 +125,6 @@ class Primary : public Role
RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
- void initializeQueue(boost::shared_ptr<broker::Queue>);
void checkReady();
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueGuard.cpp Thu Jan 23 10:15:46 2014
@@ -55,8 +55,8 @@ QueueGuard::QueueGuard(broker::Queue& q,
info.printId(os) << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
- queue.addObserver(observer);
- // Set first after calling addObserver so we know that the back of the
+ queue.getObservers().add(observer);
+ // Set first after adding the observer so we know that the back of the
// queue+1 is (or will be) a guarded position.
QueuePosition front, back;
q.getRange(front, back, broker::REPLICATOR);
@@ -86,7 +86,7 @@ void QueueGuard::dequeued(const Message&
}
void QueueGuard::cancel() {
- queue.removeObserver(observer);
+ queue.getObservers().remove(observer);
Mutex::ScopedLock l(lock);
if (cancelled) return;
QPID_LOG(debug, logPrefix << "Cancelled");
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan 23 10:15:46 2014
@@ -21,8 +21,9 @@
#include "Event.h"
#include "HaBroker.h"
+#include "IdSetter.h"
#include "QueueReplicator.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "types.h"
@@ -122,6 +123,11 @@ QueueReplicator::QueueReplicator(HaBroke
settings(hb.getSettings()),
nextId(0), maxId(0)
{
+ // The QueueReplicator will take over setting replication IDs.
+ boost::shared_ptr<IdSetter> setter =
+ q->getMessageInterceptors().findType<IdSetter>();
+ if (setter) q->getMessageInterceptors().remove(setter);
+
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -175,7 +181,7 @@ void QueueReplicator::activate() {
boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
// Enable callback to destroy()
- queue->addObserver(
+ queue->getObservers().add(
boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
}
@@ -212,8 +218,9 @@ void QueueReplicator::initializeBridge(B
arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
- arguments.setString(ReplicatingSubscription::QPID_ID_SET,
- encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
+ boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
+ if (qs) arguments.setString(ReplicatingSubscription::QPID_ID_SET, encodeStr(qs->getSnapshot()));
+
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -254,6 +261,7 @@ void QueueReplicator::dequeueEvent(const
}
// Called in connection thread of the queues bridge to primary.
+
void QueueReplicator::route(Deliverable& deliverable)
{
try {
@@ -293,11 +301,6 @@ void QueueReplicator::idEvent(const stri
nextId = decodeStr<IdEvent>(data).id;
}
-ReplicationId QueueReplicator::getMaxId() {
- Mutex::ScopedLock l(lock);
- return maxId;
-}
-
void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
@@ -320,14 +323,19 @@ bool QueueReplicator::hasBindings() { re
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
void QueueReplicator::promoted() {
- // Promoted to primary, deal with auto-delete now.
- if (queue && queue->isAutoDelete() && subscribed) {
- // Make a temporary shared_ptr to prevent premature deletion of queue.
- // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
- // which could delete the queue while it's still running it's destroyed logic.
- boost::shared_ptr<Queue> q(queue);
- q->releaseFromUse();
- q->scheduleAutoDelete();
+ if (queue) {
+ // On primary QueueReplicator no longer sets IDs, start an IdSetter.
+ queue->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
+ // Process auto-deletes
+ if (queue->isAutoDelete() && subscribed) {
+ // Make a temporary shared_ptr to prevent premature deletion of queue.
+ // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+ // which could delete the queue while it's still running it's destroyed logic.
+ boost::shared_ptr<Queue> q(queue);
+ q->releaseFromUse();
+ q->scheduleAutoDelete();
+ }
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Jan 23 10:15:46 2014
@@ -85,8 +85,6 @@ class QueueReplicator : public broker::E
boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
- ReplicationId getMaxId();
-
// No-op unused Exchange virtual functions.
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueSnapshot.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueSnapshot.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueSnapshot.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueSnapshot.h Thu Jan 23 10:15:46 2014
@@ -53,7 +53,7 @@ class QueueSnapshot : public broker::Qu
void requeued(const broker::Message&) {}
- ReplicationIdSet snapshot() {
+ ReplicationIdSet getSnapshot() {
sys::Mutex::ScopedLock l(lock);
return set;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jan 23 10:15:46 2014
@@ -22,7 +22,7 @@
#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "TxReplicatingSubscription.h"
#include "Primary.h"
@@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize
info.printId(os) << ": ";
logPrefix = os.str();
- // If this is a non-cluster standalone replication then we need to
- // set up an IdSetter if there is not already one.
- boost::shared_ptr<IdSetter> idSetter;
- queue->getMessageInterceptors().each(
- boost::bind(©If, _1, boost::ref(idSetter)));
- if (!idSetter) {
- QPID_LOG(debug, logPrefix << "Standalone replication");
- queue->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
- }
-
// If there's already a guard (we are in failover) use it, else create one.
if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -150,16 +139,16 @@ void ReplicatingSubscription::initialize
// However we must attach the observer _before_ we snapshot for
// initial dequeues to be sure we don't miss any dequeues
// between the snapshot and attaching the observer.
- queue->addObserver(
+ queue->getObservers().add(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
- boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+ boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>();
// There may be no snapshot if the queue is being deleted concurrently.
if (!snapshot) {
- queue->removeObserver(
+ queue->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
}
- ReplicationIdSet primaryIds = snapshot->snapshot();
+ ReplicationIdSet primaryIds = snapshot->getSnapshot();
std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
ReplicationIdSet backupIds;
if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
@@ -254,7 +243,7 @@ void ReplicatingSubscription::cancel()
}
QPID_LOG(debug, logPrefix << "Cancelled");
if (primary) primary->removeReplica(*this);
- getQueue()->removeObserver(
+ getQueue()->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
guard->cancel();
ConsumerImpl::cancel();
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Thu Jan 23 10:15:46 2014
@@ -68,89 +68,89 @@ class Primary;
* ReplicatingSubscription makes calls on QueueGuard, but not vice-versa.
*/
class ReplicatingSubscription :
- public broker::SemanticState::ConsumerImpl,
- public broker::QueueObserver
+ public broker::SemanticState::ConsumerImpl,
+ public broker::QueueObserver
{
-public:
-typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+ public:
+ typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
-class Factory : public broker::ConsumerFactory {
-public:
-Factory(HaBroker& hb) : haBroker(hb) {}
-
-HaBroker& getHaBroker() const { return haBroker; }
-
-boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
-broker::SemanticState* parent,
- const std::string& name, boost::shared_ptr<broker::Queue> ,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl,
- const framing::FieldTable& arguments);
-private:
-HaBroker& haBroker;
-};
-
-// Argument names for consume command.
-static const std::string QPID_REPLICATING_SUBSCRIPTION;
-static const std::string QPID_BROKER_INFO;
-static const std::string QPID_ID_SET;
-// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
-static const std::string QPID_QUEUE_REPLICATOR;
-static const std::string QPID_TX_REPLICATOR;
+ class Factory : public broker::ConsumerFactory {
+ public:
+ Factory(HaBroker& hb) : haBroker(hb) {}
+
+ HaBroker& getHaBroker() const { return haBroker; }
+
+ boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+ broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+ private:
+ HaBroker& haBroker;
+ };
+
+ // Argument names for consume command.
+ static const std::string QPID_REPLICATING_SUBSCRIPTION;
+ static const std::string QPID_BROKER_INFO;
+ static const std::string QPID_ID_SET;
+ // Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
+ static const std::string QPID_QUEUE_REPLICATOR;
+ static const std::string QPID_TX_REPLICATOR;
-ReplicatingSubscription(HaBroker& haBroker,
+ ReplicatingSubscription(HaBroker& haBroker,
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
const framing::FieldTable& arguments);
-~ReplicatingSubscription();
-
-
-// Consumer overrides.
-bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
-void cancel();
-void acknowledged(const broker::DeliveryRecord&);
-bool browseAcquired() const { return true; }
-// Hide the "queue deleted" error for a ReplicatingSubscription when a
-// queue is deleted, this is normal and not an error.
-bool hideDeletedError() { return true; }
-
-// QueueObserver overrides
-void enqueued(const broker::Message&) {}
-void dequeued(const broker::Message&);
-void acquired(const broker::Message&) {}
-void requeued(const broker::Message&) {}
-
-/** A ReplicatingSubscription is a passive observer, not counted for auto
- * deletion and immediate message purposes.
- */
-bool isCounted() { return false; }
-
-/** Initialization that must be done separately from construction
- * because it requires a shared_ptr to this to exist.
- */
-void initialize();
-
-BrokerInfo getBrokerInfo() const { return info; }
+ ~ReplicatingSubscription();
-/** Skip replicating enqueue of of ids. */
-void addSkip(const ReplicationIdSet& ids);
-protected:
-bool doDispatch();
-
-private:
-std::string logPrefix;
-QueuePosition position;
-ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
-ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
-ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
-bool ready;
-bool cancelled;
-BrokerInfo info;
-boost::shared_ptr<QueueGuard> guard;
+ // Consumer overrides.
+ bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
+ void cancel();
+ void acknowledged(const broker::DeliveryRecord&);
+ bool browseAcquired() const { return true; }
+ // Hide the "queue deleted" error for a ReplicatingSubscription when a
+ // queue is deleted, this is normal and not an error.
+ bool hideDeletedError() { return true; }
+
+ // QueueObserver overrides
+ void enqueued(const broker::Message&) {}
+ void dequeued(const broker::Message&);
+ void acquired(const broker::Message&) {}
+ void requeued(const broker::Message&) {}
+
+ /** A ReplicatingSubscription is a passive observer, not counted for auto
+ * deletion and immediate message purposes.
+ */
+ bool isCounted() { return false; }
+
+ /** Initialization that must be done separately from construction
+ * because it requires a shared_ptr to this to exist.
+ */
+ void initialize();
+
+ BrokerInfo getBrokerInfo() const { return info; }
+
+ /** Skip replicating enqueue of of ids. */
+ void addSkip(const ReplicationIdSet& ids);
+
+ protected:
+ bool doDispatch();
+
+ private:
+ std::string logPrefix;
+ QueuePosition position;
+ ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
+ ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
+ ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
+ bool ready;
+ bool cancelled;
+ BrokerInfo info;
+ boost::shared_ptr<QueueGuard> guard;
HaBroker& haBroker;
boost::shared_ptr<Primary> primary;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
LinearStore issues:
Store:
@@ -27,12 +46,23 @@ Store:
* Store analysis and status
* Recovery/reading of message content
+8. One journal file lost when queue deleted. All files except for one are recycled back to the EFP.
+
+9. Complete exceptions - several exceptions thrown using jexception have no exception numbers
+
Current bugs and performance issues:
------------------------------------
-1. RH Bugzilla 1035843 - Slow performance for producers
+1. BZ 1035843 - Slow performance for producers
2. (FIXED) QPID-5387 (BZ 1036071) - Crash when deleting queue
3. (FIXED) QPID-5388 (BZ 1035802) - Segmentation fault when recovering empty queue
-4. RH Bugzilla 1036026 - Unable to create durable queue - framing error
+4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time
+5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071)
+6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL
+7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
+8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
+9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery
+10. QPID-5464 - Incompletely created journal files accumulate in EFP
+11. QPID-5473 (BZ 1051924) - Recovery where last record in file is truncated (ie spans files), but following file is uninitialized causes crash
Code tidy-up
------------
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.cpp Thu Jan 23 10:15:46 2014
@@ -30,23 +30,36 @@
namespace qpid {
namespace linearstore {
-InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
+InactivityFireEvent::InactivityFireEvent(JournalImpl* p,
+ const ::qpid::sys::Duration timeout):
+ ::qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
-void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
+void InactivityFireEvent::fire() {
+ ::qpid::sys::Mutex::ScopedLock sl(_ife_lock);
+ if (_parent) {
+ _parent->flushFire();
+ }
+}
-GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {}
+GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p,
+ const ::qpid::sys::Duration timeout):
+ ::qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p)
+{}
-void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
+void GetEventsFireEvent::fire() {
+ ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock);
+ if (_parent) {
+ _parent->getEventsFire();
+ }
+}
-JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+JournalImpl::JournalImpl(::qpid::sys::Timer& timer_,
const std::string& journalId,
const std::string& journalDirectory,
JournalLogImpl& journalLogRef,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* a,
+ const ::qpid::sys::Duration getEventsTimeout,
+ const ::qpid::sys::Duration flushTimeout,
+ ::qpid::management::ManagementAgent* a,
DeleteCallback onDelete):
jcntl(journalId, journalDirectory, journalLogRef),
timer(timer_),
@@ -76,7 +89,7 @@ JournalImpl::~JournalImpl()
if (deleteCallback) deleteCallback(*this);
if (_init_flag && !_stop_flag){
try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
- catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); }
+ catch (const ::qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); }
}
getEventsFireEventsPtr->cancel();
inactivityFireEventPtr->cancel();
@@ -90,7 +103,7 @@ JournalImpl::~JournalImpl()
}
void
-JournalImpl::initManagement(qpid::management::ManagementAgent* a)
+JournalImpl::initManagement(::qpid::management::ManagementAgent* a)
{
_agent = a;
if (_agent != 0)
@@ -117,10 +130,10 @@ JournalImpl::initManagement(qpid::manage
void
-JournalImpl::initialize(qpid::linearstore::journal::EmptyFilePool* efpp_,
+JournalImpl::initialize(::qpid::linearstore::journal::EmptyFilePool* efpp_,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::linearstore::journal::aio_callback* const cbp)
+ ::qpid::linearstore::journal::aio_callback* const cbp)
{
// efpp->createJournal(_jdir);
// QLS_LOG2(notice, _jid, "Initialized");
@@ -152,10 +165,10 @@ JournalImpl::initialize(qpid::linearstor
}
void
-JournalImpl::recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+JournalImpl::recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::linearstore::journal::aio_callback* const cbp,
+ ::qpid::linearstore::journal::aio_callback* const cbp,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
uint64_t& highest_rid,
uint64_t queue_id)
@@ -180,6 +193,7 @@ JournalImpl::recover(boost::shared_ptr<q
}
*/
+ // TODO: This is ugly, find a way for RecoveryManager to use boost::ptr_list<PreparedTransaction>* directly
if (prep_tx_list_ptr) {
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
@@ -196,8 +210,8 @@ JournalImpl::recover(boost::shared_ptr<q
if (prep_tx_list_ptr)
{
for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
- qpid::linearstore::journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
- for (qpid::linearstore::journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ ::qpid::linearstore::journal::txn_data_list_t tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+ for (::qpid::linearstore::journal::tdl_itr_t tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
if (tdl_itr->enq_flag_) { // enqueue op
i->enqueues->add(queue_id, tdl_itr->rid_);
} else { // dequeue op
@@ -237,8 +251,11 @@ JournalImpl::recover_complete()
void
-JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const bool transient)
+JournalImpl::enqueue_data_record(const void* const data_buff,
+ const size_t tot_data_len,
+ const size_t this_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const bool transient)
{
handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
@@ -250,8 +267,9 @@ JournalImpl::enqueue_data_record(const v
}
void
-JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const bool transient)
+JournalImpl::enqueue_extern_data_record(const size_t tot_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const bool transient)
{
handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
@@ -263,12 +281,17 @@ JournalImpl::enqueue_extern_data_record(
}
void
-JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid, const bool transient)
+JournalImpl::enqueue_txn_data_record(const void* const data_buff,
+ const size_t tot_data_len,
+ const size_t this_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool transient)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
- handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
+ handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, tpc_flag, transient));
if (_mgmtObject.get() != 0)
{
@@ -281,12 +304,15 @@ JournalImpl::enqueue_txn_data_record(con
}
void
-JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const std::string& xid, const bool transient)
+JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool transient)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
- handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
+ handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, tpc_flag, transient));
if (_mgmtObject.get() != 0)
{
@@ -299,7 +325,8 @@ JournalImpl::enqueue_extern_txn_data_rec
}
void
-JournalImpl::dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit)
+JournalImpl::dequeue_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+ const bool txn_coml_commit)
{
handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
@@ -312,11 +339,14 @@ JournalImpl::dequeue_data_record(qpid::l
}
void
-JournalImpl::dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
+JournalImpl::dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool txn_coml_commit)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
- handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
+ handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, tpc_flag, txn_coml_commit));
if (_mgmtObject.get() != 0)
{
@@ -329,7 +359,8 @@ JournalImpl::dequeue_txn_data_record(qpi
}
void
-JournalImpl::txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid)
+JournalImpl::txn_abort(::qpid::linearstore::journal::data_tok* const dtokp,
+ const std::string& xid)
{
handleIoResult(jcntl::txn_abort(dtokp, xid));
@@ -341,7 +372,8 @@ JournalImpl::txn_abort(qpid::linearstore
}
void
-JournalImpl::txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid)
+JournalImpl::txn_commit(::qpid::linearstore::journal::data_tok* const dtokp,
+ const std::string& xid)
{
handleIoResult(jcntl::txn_commit(dtokp, xid));
@@ -366,12 +398,12 @@ JournalImpl::stop(bool block_till_aio_cm
}
}
-qpid::linearstore::journal::iores
+::qpid::linearstore::journal::iores
JournalImpl::flush(const bool block_till_aio_cmpl)
{
- const qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl);
+ const ::qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl);
{
- qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+ ::qpid::sys::Mutex::ScopedLock sl(_getf_lock);
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
}
return res;
@@ -380,7 +412,7 @@ JournalImpl::flush(const bool block_till
void
JournalImpl::getEventsFire()
{
- qpid::sys::Mutex::ScopedLock sl(_getf_lock);
+ ::qpid::sys::Mutex::ScopedLock sl(_getf_lock);
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
@@ -394,7 +426,7 @@ JournalImpl::flushFire()
flushTriggeredFlag = false;
} else {
if (!flushTriggeredFlag) {
- flush();
+ flush(false);
flushTriggeredFlag = true;
}
}
@@ -405,20 +437,20 @@ JournalImpl::flushFire()
}
void
-JournalImpl::wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl)
+JournalImpl::wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl)
{
- for (std::vector<qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
+ for (std::vector< ::qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
{
DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
if (/*!is_stopped() &&*/ dtokp->getSourceMessage())
{
switch (dtokp->wstate())
{
- case qpid::linearstore::journal::data_tok::ENQ:
+ case ::qpid::linearstore::journal::data_tok::ENQ:
//std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
dtokp->getSourceMessage()->enqueueComplete();
break;
- case qpid::linearstore::journal::data_tok::DEQ:
+ case ::qpid::linearstore::journal::data_tok::DEQ:
//std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG
/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
dtokp->getSourceMessage()->dequeueComplete();
@@ -443,25 +475,25 @@ JournalImpl::createStore() {
}
void
-JournalImpl::handleIoResult(const qpid::linearstore::journal::iores r)
+JournalImpl::handleIoResult(const ::qpid::linearstore::journal::iores r)
{
writeActivityFlag = true;
switch (r)
{
- case qpid::linearstore::journal::RHM_IORES_SUCCESS:
+ case ::qpid::linearstore::journal::RHM_IORES_SUCCESS:
return;
default:
{
std::ostringstream oss;
- oss << "Unexpected I/O response (" << qpid::linearstore::journal::iores_str(r) << ").";
+ oss << "Unexpected I/O response (" << ::qpid::linearstore::journal::iores_str(r) << ").";
QLS_LOG2(error, _jid, oss.str());
THROW_STORE_FULL_EXCEPTION(oss.str());
}
}
}
-qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
- qpid::management::Args& /*args*/,
+::qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
+ ::qpid::management::Args& /*args*/,
std::string& /*text*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/JournalImpl.h Thu Jan 23 10:15:46 2014
@@ -44,86 +44,90 @@ namespace journal {
class JournalImpl;
class JournalLogImpl;
-class InactivityFireEvent : public qpid::sys::TimerTask
+class InactivityFireEvent : public ::qpid::sys::TimerTask
{
JournalImpl* _parent;
- qpid::sys::Mutex _ife_lock;
+ ::qpid::sys::Mutex _ife_lock;
public:
- InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+ InactivityFireEvent(JournalImpl* p,
+ const ::qpid::sys::Duration timeout);
virtual ~InactivityFireEvent() {}
void fire();
- inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+ inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
};
-class GetEventsFireEvent : public qpid::sys::TimerTask
+class GetEventsFireEvent : public ::qpid::sys::TimerTask
{
JournalImpl* _parent;
- qpid::sys::Mutex _gefe_lock;
+ ::qpid::sys::Mutex _gefe_lock;
public:
- GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout);
+ GetEventsFireEvent(JournalImpl* p,
+ const ::qpid::sys::Duration timeout);
virtual ~GetEventsFireEvent() {}
void fire();
- inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+ inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
};
-class JournalImpl : public qpid::broker::ExternalQueueStore, public qpid::linearstore::journal::jcntl, public qpid::linearstore::journal::aio_callback
+class JournalImpl : public ::qpid::broker::ExternalQueueStore,
+ public ::qpid::linearstore::journal::jcntl,
+ public ::qpid::linearstore::journal::aio_callback
{
public:
typedef boost::function<void (JournalImpl&)> DeleteCallback;
protected:
- qpid::sys::Timer& timer;
+ ::qpid::sys::Timer& timer;
JournalLogImpl& _journalLogRef;
bool getEventsTimerSetFlag;
- boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
- qpid::sys::Mutex _getf_lock;
- qpid::sys::Mutex _read_lock;
+ boost::intrusive_ptr< ::qpid::sys::TimerTask> getEventsFireEventsPtr;
+ ::qpid::sys::Mutex _getf_lock;
+ ::qpid::sys::Mutex _read_lock;
bool writeActivityFlag;
bool flushTriggeredFlag;
- boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+ boost::intrusive_ptr< ::qpid::sys::TimerTask> inactivityFireEventPtr;
- qpid::management::ManagementAgent* _agent;
- qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
+ ::qpid::management::ManagementAgent* _agent;
+ ::qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;
DeleteCallback deleteCallback;
public:
- JournalImpl(qpid::sys::Timer& timer,
+ JournalImpl(::qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
JournalLogImpl& journalLogRef,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* agent,
+ const ::qpid::sys::Duration getEventsTimeout,
+ const ::qpid::sys::Duration flushTimeout,
+ ::qpid::management::ManagementAgent* agent,
DeleteCallback deleteCallback=DeleteCallback() );
virtual ~JournalImpl();
- void initManagement(qpid::management::ManagementAgent* agent);
+ void initManagement(::qpid::management::ManagementAgent* agent);
- void initialize(qpid::linearstore::journal::EmptyFilePool* efp,
+ void initialize(::qpid::linearstore::journal::EmptyFilePool* efp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::linearstore::journal::aio_callback* const cbp);
+ ::qpid::linearstore::journal::aio_callback* const cbp);
- inline void initialize(qpid::linearstore::journal::EmptyFilePool* efpp,
+ inline void initialize(::qpid::linearstore::journal::EmptyFilePool* efpp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks) {
initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this);
}
- void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+ void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
- qpid::linearstore::journal::aio_callback* const cbp,
+ ::qpid::linearstore::journal::aio_callback* const cbp,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
uint64_t& highest_rid,
uint64_t queue_id);
- inline void recover(boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpm,
+ inline void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr,
@@ -135,47 +139,62 @@ class JournalImpl : public qpid::broker:
void recover_complete();
// Overrides for write inactivity timer
- void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const bool transient = false);
+ void enqueue_data_record(const void* const data_buff,
+ const size_t tot_data_len,
+ const size_t this_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const bool transient);
+
+ void enqueue_extern_data_record(const size_t tot_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const bool transient);
+
+ void enqueue_txn_data_record(const void* const data_buff,
+ const size_t tot_data_len,
+ const size_t this_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool transient);
+
+ void enqueue_extern_txn_data_record(const size_t tot_data_len,
+ ::qpid::linearstore::journal::data_tok* dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool transient);
+
+ void dequeue_data_record(::qpid::linearstore::journal::data_tok*
+ const dtokp,
+ const bool txn_coml_commit);
+
+ void dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp,
+ const std::string& xid,
+ const bool tpc_flag,
+ const bool txn_coml_commit);
- void enqueue_extern_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const bool transient = false);
+ void txn_abort(::qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
- void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, qpid::linearstore::journal::data_tok* dtokp, const std::string& xid,
- const bool transient = false);
-
- void enqueue_extern_txn_data_record(const size_t tot_data_len, qpid::linearstore::journal::data_tok* dtokp,
- const std::string& xid, const bool transient = false);
-
- void dequeue_data_record(qpid::linearstore::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
-
- void dequeue_txn_data_record(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
-
- void txn_abort(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
-
- void txn_commit(qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
+ void txn_commit(::qpid::linearstore::journal::data_tok* const dtokp, const std::string& xid);
void stop(bool block_till_aio_cmpl = false);
// Overrides for get_events timer
- qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl = false);
+ ::qpid::linearstore::journal::iores flush(const bool block_till_aio_cmpl);
// TimerTask callback
void getEventsFire();
void flushFire();
// AIO callbacks
- virtual void wr_aio_cb(std::vector<qpid::linearstore::journal::data_tok*>& dtokl);
+ virtual void wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl);
virtual void rd_aio_cb(std::vector<uint16_t>& pil);
- qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ ::qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return _mgmtObject; }
- qpid::management::Manageable::status_t ManagementMethod (uint32_t,
- qpid::management::Args&,
- std::string&);
+ ::qpid::management::Manageable::status_t ManagementMethod(uint32_t,
+ ::qpid::management::Args&,
+ std::string&);
void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
@@ -188,7 +207,7 @@ class JournalImpl : public qpid::broker:
timer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
- void handleIoResult(const qpid::linearstore::journal::iores r);
+ void handleIoResult(const ::qpid::linearstore::journal::iores r);
// Management instrumentation callbacks overridden from jcntl
inline void instr_incr_outstanding_aio_cnt() {
@@ -203,23 +222,27 @@ class JournalImpl : public qpid::broker:
class TplJournalImpl : public JournalImpl
{
public:
- TplJournalImpl(qpid::sys::Timer& timer,
+ TplJournalImpl(::qpid::sys::Timer& timer,
const std::string& journalId,
const std::string& journalDirectory,
JournalLogImpl& journalLogRef,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* agent) :
+ const ::qpid::sys::Duration getEventsTimeout,
+ const ::qpid::sys::Duration flushTimeout,
+ ::qpid::management::ManagementAgent* agent) :
JournalImpl(timer, journalId, journalDirectory, journalLogRef, getEventsTimeout, flushTimeout, agent)
{}
virtual ~TplJournalImpl() {}
// Special version of read_data_record that ignores transactions - needed when reading the TPL
- inline qpid::linearstore::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
- void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
- qpid::linearstore::journal::data_tok* const dtokp) {
- return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+ inline ::qpid::linearstore::journal::iores read_data_record(void** const datapp,
+ std::size_t& dsize,
+ void** const xidpp,
+ std::size_t& xidsize,
+ bool& transient,
+ bool& external,
+ ::qpid::linearstore::journal::data_tok* const dtokp) {
+ return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, false);
}
}; // class TplJournalImpl
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Thu Jan 23 10:15:46 2014
@@ -51,16 +51,6 @@ qpid::sys::Duration MessageStoreImpl::de
qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const uint64_t rid_,
- const bool deq_flag_,
- const bool commit_flag_,
- const bool tpc_flag_) :
- rid(rid_),
- deq_flag(deq_flag_),
- commit_flag(commit_flag_),
- tpc_flag(tpc_flag_)
-{}
-
MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) :
defaultEfpPartitionNumber(0),
defaultEfpFileSize_kib(0),
@@ -351,7 +341,7 @@ void MessageStoreImpl::chkTplStoreInit()
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
qpid::linearstore::journal::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
+ tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -594,6 +584,13 @@ void MessageStoreImpl::recover(qpid::bro
txn_list prepared;
recoverLockedMappings(prepared);
+ std::ostringstream oss;
+ oss << "Recovered transaction prepared list:";
+ for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ oss << std::endl << " " << str2hexnum(i->xid);
+ }
+ QLS_LOG(debug, oss.str());
+
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
message_index messages;//id->message
@@ -601,7 +598,7 @@ void MessageStoreImpl::recover(qpid::bro
TxnCtxt txn;
txn.begin(dbenv.get(), false);
try {
- //read all queues, calls recoversMessages
+ //read all queues, calls recoversMessages for each queue
recoverQueues(txn, registry_, queues, prepared, messages);
//recover exchange & bindings:
@@ -621,6 +618,7 @@ void MessageStoreImpl::recover(qpid::bro
}
//recover transactions:
+ qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
const PreparedTransaction pt = *i;
if (mgmtObject.get() != 0) {
@@ -629,20 +627,20 @@ void MessageStoreImpl::recover(qpid::bro
}
std::string xid = pt.xid;
-
- // Restore data token state in TxnCtxt
- TplRecoverMapCitr citr = tplRecoverMap.find(xid);
- if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
+ qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
+ if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ bool commitFlag = txn_op_stats.abortCnt == 0;
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
- bool incomplTplTxnFlag = citr->second.deq_flag;
+ bool incomplTplTxnFlag = txn_op_stats.deqCnt > 0;
- if (citr->second.tpc_flag) {
+ if (txn_op_stats.tpcCnt > 0) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
- tpcc->recoverDtok(citr->second.rid, xid);
+ tpcc->recoverDtok(txn_op_stats.rid, xid);
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
@@ -661,12 +659,12 @@ void MessageStoreImpl::recover(qpid::bro
}
if (incomplTplTxnFlag) {
- tpcc->complete(citr->second.commit_flag);
+ tpcc->complete(commitFlag);
}
} else {
// Local (1PC) transaction
boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
- opcc->recoverDtok(citr->second.rid, xid);
+ opcc->recoverDtok(txn_op_stats.rid, xid);
opcc->prepare(tplStorePtr.get());
if (pt.enqueues.get()) {
@@ -680,11 +678,12 @@ void MessageStoreImpl::recover(qpid::bro
}
}
if (incomplTplTxnFlag) {
- opcc->complete(citr->second.commit_flag);
+ opcc->complete(commitFlag);
} else {
- completed(*opcc.get(), citr->second.commit_flag);
+ completed(*opcc.get(), commitFlag);
}
}
+
}
registry_.recoveryComplete();
}
@@ -888,12 +887,13 @@ void MessageStoreImpl::recoverMessages(T
bool externalFlag = false;
DataTokenImpl dtok;
dtok.set_wstate(DataTokenImpl::NONE);
+ qpid::linearstore::journal::txn_map& txn_map_ref = tplStorePtr->get_txn_map();
// Read the message from the Journal.
try {
unsigned aio_sleep_cnt = 0;
while (read) {
- qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+ qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok, false);
switch (res)
{
@@ -907,7 +907,7 @@ void MessageStoreImpl::recoverMessages(T
msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
} else {
headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
- qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize);
msg = recovery.recoverMessage(headerBuff);
}
msg->setPersistenceId(dtok.rid());
@@ -932,30 +932,30 @@ void MessageStoreImpl::recoverMessages(T
} else {
uint64_t rid = dtok.rid();
std::string xid(i->xid);
- TplRecoverMapCitr citr = tplRecoverMap.find(xid);
- if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
-
- // deq present in prepared list: this xid is part of incomplete txn commit/abort
- // or this is a 1PC txn that must be rolled forward
- if (citr->second.deq_flag || !citr->second.tpc_flag) {
+ qpid::linearstore::journal::txn_data_list_t tdl = txn_map_ref.get_tdata_list(xid);
+ if (tdl.size() == 0) THROW_STORE_EXCEPTION("XID not found in txn_map");
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ if (txn_op_stats.deqCnt > 0 || txn_op_stats.tpcCnt == 0) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
- if (!citr->second.commit_flag) {
+ if (txn_op_stats.abortCnt > 0) {
rcnt++;
queue->recover(msg); // recover message in abort case only
}
} else {
// Enqueue and/or dequeue tx
qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map();
- qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+ qpid::linearstore::journal::txn_data_list_t txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
- for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->enq_flag_ && j->rid_ == rid) enq = true;
- else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
+ for (qpid::linearstore::journal::tdl_itr_t j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->enq_flag_ && j->rid_ == rid)
+ enq = true;
+ else if (!j->enq_flag_ && j->drid_ == rid)
+ deq = true;
}
- if (enq && !deq && citr->second.commit_flag) {
+ if (enq && !deq && txn_op_stats.abortCnt == 0) {
rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
@@ -969,10 +969,14 @@ void MessageStoreImpl::recoverMessages(T
dtok.reset();
dtok.set_wstate(DataTokenImpl::NONE);
- if (xidbuff)
+ if (xidbuff) {
::free(xidbuff);
- else if (dbuff)
+ xidbuff = NULL;
+ }
+ if (dbuff) {
::free(dbuff);
+ dbuff = NULL;
+ }
aio_sleep_cnt = 0;
break;
}
@@ -1033,77 +1037,6 @@ int MessageStoreImpl::enqueueMessage(Txn
return count;
}
-void MessageStoreImpl::readTplStore()
-{
- tplRecoverMap.clear();
- qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map();
- DataTokenImpl dtok;
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
- bool transientFlag = false;
- bool externalFlag = false;
- bool done = false;
- try {
- unsigned aio_sleep_cnt = 0;
- while (!done) {
- dtok.reset();
- dtok.set_wstate(DataTokenImpl::ENQ);
- qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
- switch (res) {
- case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
- // Every TPL record contains both data and an XID
- assert(dbuffSize>0);
- assert(xidbuffSize>0);
- std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
- bool is2PC = *(static_cast<char*>(dbuff)) != 0;
-
- // Check transaction details; add to recover map
- qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
- if (!txnList.empty()) { // xid found in tmap
- unsigned enqCnt = 0;
- unsigned deqCnt = 0;
- uint64_t rid = 0;
-
- // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
- // Note: will apply to both 1PC and 2PC transactions.
- bool commitFlag = true;
-
- for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->enq_flag_) {
- rid = j->rid_;
- enqCnt++;
- } else {
- commitFlag = j->commit_flag_;
- deqCnt++;
- }
- }
- assert(enqCnt == 1);
- assert(deqCnt <= 1);
- tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
- }
-
- ::free(xidbuff);
- aio_sleep_cnt = 0;
- break;
- }
- case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()");
- ::usleep(AIO_SLEEP_TIME_US);
- break;
- case qpid::linearstore::journal::RHM_IORES_EMPTY:
- done = true;
- break; // done with all messages. (add call in jrnl to test that _emap is empty.)
- default:
- std::ostringstream oss;
- oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res);
- THROW_STORE_EXCEPTION(oss.str());
- } // switch
- }
- } catch (const qpid::linearstore::journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
- }
-}
void MessageStoreImpl::recoverTplStore()
{
@@ -1114,11 +1047,7 @@ void MessageStoreImpl::recoverTplStore()
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
highestRid = thisHighestRid;
-
- // Load tplRecoverMap by reading the TPL store
- readTplStore();
-
- tplStorePtr->recover_complete(); // start journal.
+ tplStorePtr->recover_complete(); // start TPL
}
}
@@ -1126,28 +1055,32 @@ void MessageStoreImpl::recoverLockedMapp
{
if (!tplStorePtr->is_ready())
recoverTplStore();
-
- // Abort unprepared xids and populate the locked maps
- for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
+ std::vector<std::string> xidList;
+ tplStorePtr->get_txn_map().xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
}
}
void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
{
- if (tplStorePtr->is_ready()) {
- readTplStore();
- } else {
+ if (!tplStorePtr->is_ready()) {
recoverTplStore();
}
- for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
- // Discard all txns that are to be rolled forward/back and 1PC transactions
- if (!i->second.deq_flag && i->second.tpc_flag)
- xids.insert(i->first);
+ std::vector<std::string> xidList;
+ tplStorePtr->get_txn_map().xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i=xidList.begin(); i!=xidList.end(); ++i) {
+ qpid::linearstore::journal::txn_data_list_t tdl = tplStorePtr->get_txn_map().get_tdata_list(*i);
+ qpid::linearstore::journal::txn_op_stats_t txn_op_stats(tdl);
+ if (txn_op_stats.tpcCnt > 0) {
+ if (txn_op_stats.enqCnt - txn_op_stats.deqCnt > 0) {
+ xids.insert(*i);
+ }
+ }
}
}
@@ -1186,7 +1119,7 @@ void MessageStoreImpl::flush(const qpid:
JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
if (jc) {
// TODO: check if this result should be used...
- /*mrg::journal::iores res =*/ jc->flush();
+ /*mrg::journal::iores res =*/ jc->flush(false);
}
} catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
@@ -1258,7 +1191,7 @@ void MessageStoreImpl::store(const qpid:
if (txn_->getXid().empty()) {
jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message_->isPersistent());
} else {
- jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), !message_->isPersistent());
+ jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn_->getXid(), txn_->isTPC(), !message_->isPersistent());
}
} else {
THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
@@ -1309,9 +1242,10 @@ void MessageStoreImpl::async_dequeue(qpi
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg_->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
+ TxnCtxt* txn = 0;
std::string tid;
if (ctxt_) {
- TxnCtxt* txn = check(ctxt_);
+ txn = check(ctxt_);
tid = txn->getXid();
}
// Manually increase the ref count, as raw pointers are used beyond this point
@@ -1319,9 +1253,9 @@ void MessageStoreImpl::async_dequeue(qpi
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue_.getExternalQueueStore());
if (tid.empty()) {
- jc->dequeue_data_record(ddtokp.get());
+ jc->dequeue_data_record(ddtokp.get(), false);
} else {
- jc->dequeue_txn_data_record(ddtokp.get(), tid);
+ jc->dequeue_txn_data_record(ddtokp.get(), tid, txn?txn->isTPC():false, false);
}
} catch (const qpid::linearstore::journal::jexception& e) {
ddtokp->release();
@@ -1341,7 +1275,7 @@ void MessageStoreImpl::completed(TxnCtxt
DataTokenImpl* dtokp = txn_.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), commit_);
+ tplStorePtr->dequeue_txn_data_record(txn_.getDtok(), txn_.getXid(), txn_.isTPC(), commit_);
}
txn_.complete(commit_);
if (mgmtObject.get() != 0) {
@@ -1376,12 +1310,16 @@ void MessageStoreImpl::prepare(qpid::bro
{
checkInit();
TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt_);
+//std::string xid=txn->getXid(); std::cout << "*** MessageStoreImpl::prepare() xid=" << std::hex;
+//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl;
if(!txn) throw qpid::broker::InvalidTransactionContextException();
localPrepare(txn);
}
void MessageStoreImpl::localPrepare(TxnCtxt* ctxt_)
{
+//std::string xid=ctxt_->getXid(); std::cout << "*** MessageStoreImpl::localPrepare() xid=" << std::hex;
+//for (unsigned i=0; i<xid.length(); ++i) std::cout << "\\" << (int)xid.at(i); std::cout << " ***" << std::dec << std::endl;
try {
chkTplStoreInit(); // Late initialize (if needed)
@@ -1394,7 +1332,7 @@ void MessageStoreImpl::localPrepare(TxnC
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
char tpcFlag = static_cast<char>(ctxt_->isTPC());
- tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), false);
+ tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt_->getXid(), tpcFlag != 0, false);
ctxt_->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt_->sync();
@@ -1572,6 +1510,15 @@ void MessageStoreImpl::journalDeleted(Jo
journalList.erase(j_.id());
}
+std::string MessageStoreImpl::str2hexnum(const std::string& str) {
+ std::ostringstream oss;
+ oss << "(" << str.size() << ")0x" << std::hex;
+ for (unsigned i=str.size(); i>0; --i) {
+ oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
+ }
+ return oss.str();
+}
+
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
qpid::Options(name_),
truncateFlag(defTruncateFlag),
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Thu Jan 23 10:15:46 2014
@@ -89,19 +89,6 @@ class MessageStoreImpl : public qpid::br
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Structs for Transaction Recover List (TPL) recover state
- struct TplRecoverStruct {
- uint64_t rid; // rid of TPL record
- bool deq_flag;
- bool commit_flag;
- bool tpc_flag;
- TplRecoverStruct(const uint64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
- };
- typedef TplRecoverStruct TplRecover;
- typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
- typedef std::map<std::string, TplRecover> TplRecoverMap;
- typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
-
typedef std::map<std::string, JournalImpl*> JournalListMap;
typedef JournalListMap::iterator JournalListMapItr;
@@ -127,7 +114,6 @@ class MessageStoreImpl : public qpid::br
// Pointer to Transaction Prepared List (TPL) journal instance
boost::shared_ptr<TplJournalImpl> tplStorePtr;
- TplRecoverMap tplRecoverMap;
qpid::sys::Mutex tplInitLock;
JournalListMap journalList;
qpid::sys::Mutex journalListLock;
@@ -202,7 +188,6 @@ class MessageStoreImpl : public qpid::br
queue_index& index,
txn_list& locked,
message_index& prepared);
- void readTplStore();
void recoverTplStore();
void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
@@ -250,18 +235,7 @@ class MessageStoreImpl : public qpid::br
}
void chkTplStoreInit();
- // debug aid for printing XIDs that may contain non-printable chars
- static std::string xid2str(const std::string xid) {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- for (unsigned i=0; i<xid.size(); i++) {
- if (isprint(xid[i]))
- oss << xid[i];
- else
- oss << "/" << std::setw(2) << (int)((char)xid[i]);
- }
- return oss.str();
- }
+ static std::string str2hexnum(const std::string& str);
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp Thu Jan 23 10:15:46 2014
@@ -114,7 +114,7 @@ void TxnCtxt::sync() {
void TxnCtxt::jrnl_flush(JournalImpl* jc) {
if (jc && !(jc->is_txn_synced(getXid())))
- jc->flush();
+ jc->flush(false);
}
void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp Thu Jan 23 10:15:46 2014
@@ -51,6 +51,10 @@ void LinearFileController::initialize(co
}
void LinearFileController::finalize() {
+ if (currentJournalFilePtr_) {
+ currentJournalFilePtr_->close();
+ currentJournalFilePtr_ = NULL;
+ }
while (!journalFileList_.empty()) {
delete journalFileList_.front();
journalFileList_.pop_front();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org