You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/06/23 12:46:16 UTC

svn commit: r787625 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h QueuePolicy.cpp QueuePolicy.h

Author: gsim
Date: Tue Jun 23 10:46:15 2009
New Revision: 787625

URL: http://svn.apache.org/viewvc?rev=787625&view=rev
Log:
QPID-1936: Fix potential deadlock for durable ring queue


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=787625&r1=787624&r2=787625&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 23 10:46:15 2009
@@ -551,11 +551,16 @@
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
+    Messages dequeues;
     QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
-        if (policy.get()) policy->tryEnqueue(qm);
+        if (policy.get()) {
+            policy->tryEnqueue(qm);
+            //depending on policy, may have some dequeues
+            if (!isRecovery) pendingDequeues.swap(dequeues);
+        }
         if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
          
         LVQ::iterator i;
@@ -591,6 +596,10 @@
         }
     }
     copy.notify();
+    if (!dequeues.empty()) {
+        //depending on policy, may have some dequeues
+        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+    }
 }
 
 QueuedMessage Queue::getFront()
@@ -1026,4 +1035,10 @@
     return !policy.get() || policy->isEnqueued(msg);
 }
 
+void Queue::addPendingDequeue(const QueuedMessage& msg)
+{
+    //assumes lock is held - true at present but rather nasty as this is a public method
+    pendingDequeues.push_back(msg);    
+}
+
 QueueListeners& Queue::getListeners() { return listeners; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=787625&r1=787624&r2=787625&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jun 23 10:46:15 2009
@@ -326,6 +326,18 @@
              */
             void recoveryComplete();
 
+            /**
+             * This is a hack to avoid deadlocks in durable ring
+             * queues. It is used for dequeueing messages in response
+             * to an enqueue while avoid holding lock over call to
+             * store.
+             * 
+             * Assumes messageLock is held - true for curent use case
+             * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
+             * method
+             **/
+            void addPendingDequeue(const QueuedMessage &msg);
+
             // For cluster update
             QueueListeners& getListeners();
         };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=787625&r1=787624&r2=787625&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Jun 23 10:46:15 2009
@@ -207,13 +207,9 @@
 {
     qpid::sys::Mutex::ScopedLock l(lock);
     //find and remove m from queue
-    for (Messages::iterator i = queue.begin(); i != queue.end(); i++) {
-        if (i->payload == m.payload) {
-            queue.erase(i);
-            //now update count and size
-            QueuePolicy::dequeued(m);
-            break;
-        }
+    if (find(m, pendingDequeues, true) || find(m, queue, true)) {
+        //now update count and size
+        QueuePolicy::dequeued(m);
     }
 }
 
@@ -223,12 +219,7 @@
     //for non-strict ring policy, a message can be replaced (and
     //therefore dequeued) before it is accepted or released by
     //subscriber; need to detect this
-    for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) {
-        if (i->payload == m.payload) {
-            return true;
-        }
-    }
-    return false;
+    return find(m, pendingDequeues, false) || find(m, queue, false); 
 }
 
 bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
@@ -248,7 +239,18 @@
         oldest = queue.front();
     }
     if (oldest.queue->acquire(oldest) || !strict) {
-        oldest.queue->dequeue(0, oldest);
+        {
+            //TODO: fix this!  In the current code, this method is
+            //only ever called with the Queue lock already taken. This
+            //should not be relied upon going forward however and
+            //clearly the locking in this class is insufficient as
+            //there is no guarantee that the message previously atthe
+            //front is still there.
+            qpid::sys::Mutex::ScopedLock l(lock);
+            queue.pop_front();
+            pendingDequeues.push_back(oldest);
+        }
+        oldest.queue->addPendingDequeue(oldest);
         QPID_LOG(debug, "Ring policy triggered in queue " 
                  << (m.queue ? m.queue->getName() : std::string("unknown queue"))
                  << ": removed message " << oldest.position << " to make way for " << m.position);
@@ -264,6 +266,17 @@
     }
 }
 
+bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
+{
+    for (Messages::iterator i = q.begin(); i != q.end(); i++) {
+        if (i->payload == m.payload) {
+            if (remove) q.erase(i);
+            return true;
+        }
+    }
+    return false;
+}
+
 std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
 {
     uint32_t maxCount = getInt(settings, maxCountKey, 0);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=787625&r1=787624&r2=787625&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Tue Jun 23 10:46:15 2009
@@ -101,8 +101,11 @@
   private:
     typedef std::deque<QueuedMessage> Messages;
     qpid::sys::Mutex lock;
+    Messages pendingDequeues;
     Messages queue;
     const bool strict;
+
+    bool find(const QueuedMessage&, Messages&, bool remove);
 };
 
 }}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org