You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/08/08 11:24:03 UTC

svn commit: r1616703 - in /qpid/trunk/qpid/cpp/src/qpid/ha: Primary.cpp Primary.h PrimaryTxObserver.cpp PrimaryTxObserver.h ReplicatingSubscription.cpp ReplicatingSubscription.h

Author: aconway
Date: Fri Aug  8 09:24:03 2014
New Revision: 1616703

URL: http://svn.apache.org/r1616703
Log:
QPID-5974: HA qpid-txtest2 can bring down a cluster (JERR_MAP_LOCKED))

Problem: transactional dequeues can be sent via two paths as part of the transaction and
via the normal queue replication. If journal is involved this can result result in store errors
if the normal replication path attempts to dequeue before the transaction.

Solution: this is also the case for enqueues, and we already have code in place to skip replication
of tx enqueues via the normal route. Copied the same logic for dequeues.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1616703&r1=1616702&r2=1616703&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Aug  8 09:24:03 2014
@@ -265,14 +265,24 @@ void Primary::addReplica(ReplicatingSubs
     replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
 }
 
-void Primary::skip(
+void Primary::skipEnqueues(
     const types::Uuid& backup,
     const boost::shared_ptr<broker::Queue>& queue,
     const ReplicationIdSet& ids)
 {
     sys::Mutex::ScopedLock l(lock);
     ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
-    if (i != replicas.end()) i->second->addSkip(ids);
+    if (i != replicas.end()) i->second->skipEnqueues(ids);
+}
+
+void Primary::skipDequeues(
+    const types::Uuid& backup,
+    const boost::shared_ptr<broker::Queue>& queue,
+    const ReplicationIdSet& ids)
+{
+    sys::Mutex::ScopedLock l(lock);
+    ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
+    if (i != replicas.end()) i->second->skipDequeues(ids);
 }
 
 // Called from ReplicatingSubscription::cancel

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1616703&r1=1616702&r2=1616703&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Fri Aug  8 09:24:03 2014
@@ -90,9 +90,14 @@ class Primary : public Role
     void removeReplica(const ReplicatingSubscription&);
 
     /** Skip replication of ids to queue on backup. */
-    void skip(const types::Uuid& backup,
-              const boost::shared_ptr<broker::Queue>& queue,
-              const ReplicationIdSet& ids);
+    void skipEnqueues(const types::Uuid& backup,
+                      const boost::shared_ptr<broker::Queue>& queue,
+                      const ReplicationIdSet& ids);
+
+    /** Skip replication of dequeue of ids to queue on backup. */
+    void skipDequeues(const types::Uuid& backup,
+                      const boost::shared_ptr<broker::Queue>& queue,
+                      const ReplicationIdSet& ids);
 
     // Called via BrokerObserver
     void queueCreate(const QueuePtr&);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1616703&r1=1616702&r2=1616703&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Fri Aug  8 09:24:03 2014
@@ -165,6 +165,7 @@ void PrimaryTxObserver::dequeue(
     if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
         QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
         empty = false;
+        dequeues[q] += id;
         txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
     }
 }
@@ -180,25 +181,30 @@ struct Skip {
          const ReplicationIdSet& ids_) :
         backup(backup_), queue(queue_), ids(ids_) {}
 
-    void skip(Primary& p) const { p.skip(backup, queue, ids); }
+    void skipEnqueues(Primary& p) const { p.skipEnqueues(backup, queue, ids); }
+    void skipDequeues(Primary& p) const { p.skipDequeues(backup, queue, ids); }
 };
 } // namespace
 
+void PrimaryTxObserver::skip(Mutex::ScopedLock&) {
+    // Tell replicating subscriptions to skip IDs in the transaction.
+    vector<Skip> skipEnq, skipDeq;
+    for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) {
+        for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
+            skipEnq.push_back(Skip(*b, q->first, q->second));
+        for (QueueIdsMap::iterator q = dequeues.begin(); q != dequeues.end(); ++q)
+            skipDeq.push_back(Skip(*b, q->first, q->second));
+    }
+    Mutex::ScopedUnlock u(lock); // Outside lock
+    for_each(skipEnq.begin(), skipEnq.end(), boost::bind(&Skip::skipEnqueues, _1, boost::ref(primary)));
+    for_each(skipDeq.begin(), skipDeq.end(), boost::bind(&Skip::skipDequeues, _1, boost::ref(primary)));
+}
+
 bool PrimaryTxObserver::prepare() {
     QPID_LOG(debug, logPrefix << "Prepare " << backups);
-    vector<Skip> skips;
-    {
-        Mutex::ScopedLock l(lock);
-        checkState(SENDING, "Too late for prepare");
-        state = PREPARING;
-        // Tell replicating subscriptions to skip IDs in the transaction.
-        for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
-            for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
-                skips.push_back(Skip(*b, q->first, q->second));
-    }
-    // Outside lock
-    for_each(skips.begin(), skips.end(),
-             boost::bind(&Skip::skip, _1, boost::ref(primary)));
+    Mutex::ScopedLock l(lock);
+    checkState(SENDING, "Too late for prepare");
+    state = PREPARING;
     txQueue->deliver(TxPrepareEvent().message());
     return true;
 }
@@ -208,6 +214,7 @@ void PrimaryTxObserver::commit() {
     Mutex::ScopedLock l(lock);
     checkState(PREPARING, "Cannot commit, not preparing");
     if (incomplete.size() == 0) {
+        skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue.
         txQueue->deliver(TxCommitEvent().message());
         end(l);
     } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1616703&r1=1616702&r2=1616703&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Fri Aug  8 09:24:03 2014
@@ -99,6 +99,7 @@ class PrimaryTxObserver : public broker:
     PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
     void initialize();
 
+    void skip(sys::Mutex::ScopedLock&);
     void checkState(State expect, const std::string& msg);
     void end(sys::Mutex::ScopedLock&);
     void txPrepareOkEvent(const std::string& data);
@@ -120,7 +121,7 @@ class PrimaryTxObserver : public broker:
     types::Uuid id;
     std::string exchangeName;
     QueuePtr txQueue;
-    QueueIdsMap enqueues;
+    QueueIdsMap enqueues, dequeues;
     UuidSet backups;            // All backups of transaction.
     UuidSet incomplete;         // Incomplete backups (not yet responded to prepare)
     bool empty;                 // True if the transaction is empty - no enqueues/dequeues.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1616703&r1=1616702&r2=1616703&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Aug  8 09:24:03 2014
@@ -161,7 +161,7 @@ void ReplicatingSubscription::initialize
         {
             sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
             dequeues += initDequeues;       // Messages on backup that are not on primary.
-            skip = backupIds - initDequeues; // Messages already on the backup.
+            skipEnqueue = backupIds - initDequeues; // Messages already on the backup.
             // Queue front is moving but we know this subscriptions will start at a
             // position >= front so if front is safe then position must be.
             position = front;
@@ -169,7 +169,7 @@ void ReplicatingSubscription::initialize
             QPID_LOG(debug, logPrefix << "Subscribed: front " << front
                      << ", back " << back
                      << ", guarded " << guard->getFirst()
-                     << ", on backup " << skip);
+                     << ", on backup " << skipEnqueue);
             checkReady(l);
         }
 
@@ -215,9 +215,9 @@ bool ReplicatingSubscription::deliver(
     position = m.getSequence();
     try {
         bool result = false;
-        if (skip.contains(id)) {
+        if (skipEnqueue.contains(id)) {
             QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
-            skip -= id;
+            skipEnqueue -= id;
             guard->complete(id); // This will never be acknowledged.
             notify();
             result = true;
@@ -281,6 +281,9 @@ void ReplicatingSubscription::acknowledg
 // Called with lock held. Called in subscription's connection thread.
 void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
 {
+    ReplicationIdSet oldDequeues = dequeues;
+    dequeues -= skipDequeue;    // Don't send skipped dequeues
+    skipDequeue -= oldDequeues; // Forget dequeues that would have been sent.
     if (dequeues.empty()) return;
     QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
     sendEvent(DequeueEvent(dequeues), l);
@@ -332,9 +335,14 @@ bool ReplicatingSubscription::doDispatch
     }
 }
 
-void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) {
+void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) {
     Mutex::ScopedLock l(lock);
-    skip += ids;
+    skipEnqueue += ids;
+}
+
+void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) {
+    Mutex::ScopedLock l(lock);
+    skipDequeue += ids;
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1616703&r1=1616702&r2=1616703&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Aug  8 09:24:03 2014
@@ -137,8 +137,8 @@ class ReplicatingSubscription :
 
     BrokerInfo getBrokerInfo() const { return info; }
 
-    /** Skip replicating enqueue of of ids. */
-    void addSkip(const ReplicationIdSet& ids);
+void skipEnqueues(const ReplicationIdSet& ids);
+void skipDequeues(const ReplicationIdSet& ids);
 
   protected:
     bool doDispatch();
@@ -147,7 +147,8 @@ class ReplicatingSubscription :
     std::string logPrefix;
     QueuePosition position;
     ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
-    ReplicationIdSet skip;   // Skip enqueues: messages already on backup and tx enqueues.
+    ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues.
+    ReplicationIdSet skipDequeue; // Dequeues to skip: tx dequeues.
     ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
     bool wasStopped;
     bool ready;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org