You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/01/31 20:43:09 UTC

svn commit: r1441158 - in /qpid/trunk/qpid/cpp/src/qpid/ha: QueueGuard.cpp QueueGuard.h ReplicatingSubscription.h

Author: aconway
Date: Thu Jan 31 19:43:09 2013
New Revision: 1441158

URL: http://svn.apache.org/viewvc?rev=1441158&view=rev
Log:
QPID-4555: HA Fix race condition in QueueGuard

- If cancelled could delay a message without recording it.
- Make all actions involving the delayed set and the AsyncCompletion atomic.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1441158&r1=1441157&r2=1441158&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Thu Jan 31 19:43:09 2013
@@ -67,44 +67,30 @@ QueueGuard::~QueueGuard() { cancel(); }
 void QueueGuard::enqueued(const Message& m) {
     // Delay completion
     QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
+    Mutex::ScopedLock l(lock);
+    if (cancelled) return;  // Don't record enqueues after we are cancelled.
+    assert(delayed.find(m.getSequence()) == delayed.end());
+    delayed[m.getSequence()] = m.getIngressCompletion();
     m.getIngressCompletion()->startCompleter();
-    {
-        Mutex::ScopedLock l(lock);
-        if (cancelled) return;  // Don't record enqueues after we are cancelled.
-        assert(delayed.find(m.getSequence()) == delayed.end());
-        delayed[m.getSequence()] = m.getIngressCompletion();
-    }
 }
 
 // NOTE: Called with message lock held.
 void QueueGuard::dequeued(const Message& m) {
     QPID_LOG(trace, logPrefix << "Dequeued " << m);
-    ReplicatingSubscription* rs=0;
-    {
-        Mutex::ScopedLock l(lock);
-        rs = subscription;
-    }
-    if (rs) rs->dequeued(m);
-    complete(m.getSequence());
-}
-
-void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
-    for (Delayed::iterator i = begin; i != end; ++i) {
-        QPID_LOG(trace, logPrefix << "Completed " << i->first);
-        i->second->finishCompleter();
-    }
+    Mutex::ScopedLock l(lock);
+    if (subscription) subscription->dequeued(m);
+    complete(m.getSequence(), l);
 }
 
 void QueueGuard::cancel() {
     queue.removeObserver(observer);
-    Delayed removed;
-    {
-        Mutex::ScopedLock l(lock);
-        if (cancelled) return;
-        cancelled = true;
-        delayed.swap(removed);
+    Mutex::ScopedLock l(lock);
+    if (cancelled) return;
+    cancelled = true;
+    for (Delayed::iterator i = delayed.begin(); i != delayed.end();) {
+        complete(i, l);
+        delayed.erase(i++);
     }
-    completeRange(removed.begin(), removed.end());
 }
 
 void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -115,37 +101,34 @@ void QueueGuard::attach(ReplicatingSubsc
 bool QueueGuard::subscriptionStart(SequenceNumber position) {
     // Complete any messages before or at the ReplicatingSubscription start position.
     // Those messages are already on the backup.
-    Delayed removed;
-    {
-        Mutex::ScopedLock l(lock);
-        Delayed::iterator i = delayed.begin();
-        while(i != delayed.end() && i->first <= position) {
-            removed.insert(*i);
-            delayed.erase(i++);
-        }
+    Mutex::ScopedLock l(lock);
+    Delayed::iterator i = delayed.begin();
+    while(i != delayed.end() && i->first <= position) {
+        complete(i, l);
+        delayed.erase(i++);
     }
-    completeRange(removed.begin(), removed.end());
     return position >= range.back;
 }
 
 void QueueGuard::complete(SequenceNumber sequence) {
-    boost::intrusive_ptr<broker::AsyncCompletion> m;
-    {
-        Mutex::ScopedLock l(lock);
-        // The same message can be completed twice, by
-        // ReplicatingSubscription::acknowledged and dequeued. Remove it
-        // from the map so we only call finishCompleter() once
-        Delayed::iterator i = delayed.find(sequence);
-        if (i != delayed.end()) {
-            m = i->second;
-            delayed.erase(i);
-        }
+    Mutex::ScopedLock l(lock);
+    complete(sequence, l);
+}
 
+void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) {
+    // The same message can be completed twice, by
+    // ReplicatingSubscription::acknowledged and dequeued. Remove it
+    // from the map so we only call finishCompleter() once
+    Delayed::iterator i = delayed.find(sequence);
+    if (i != delayed.end()) {
+        complete(i, l);
+        delayed.erase(i);
     }
-    if (m) {
-        QPID_LOG(trace, logPrefix << "Completed " << sequence);
-        m->finishCompleter();
-    }
+}
+
+void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
+    QPID_LOG(trace, logPrefix << "Completed " << i->first);
+    i->second->finishCompleter();
 }
 
 }} // namespaces qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1441158&r1=1441157&r2=1441158&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Thu Jan 31 19:43:09 2013
@@ -54,6 +54,9 @@ class ReplicatingSubscription;
  * THREAD SAFE: Concurrent calls:
  *  - enqueued() via QueueObserver in arbitrary connection threads.
  *  - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread.
+ *
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
  */
 class QueueGuard {
   public:
@@ -104,18 +107,20 @@ class QueueGuard {
 
   private:
     class QueueObserver;
+    typedef std::map<framing::SequenceNumber,
+                     boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+
+    void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &);
+    void complete(Delayed::iterator, sys::Mutex::ScopedLock &);
 
     sys::Mutex lock;
     bool cancelled;
     std::string logPrefix;
     broker::Queue& queue;
-    typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
     Delayed delayed;
     ReplicatingSubscription* subscription;
     boost::shared_ptr<QueueObserver> observer;
     QueueRange range;
-
-    void completeRange(Delayed::iterator begin, Delayed::iterator end);
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1441158&r1=1441157&r2=1441158&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Thu Jan 31 19:43:09 2013
@@ -61,6 +61,8 @@ class QueueGuard;
  *
  * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
  *
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
  */
 class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
 {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org