You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/09/28 14:12:42 UTC

svn commit: r819505 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/broker/QueuePolicy.cpp qpid/broker/QueuePolicy.h tests/ring_queue_test

Author: gsim
Date: Mon Sep 28 12:12:41 2009
New Revision: 819505

URL: http://svn.apache.org/viewvc?rev=819505&view=rev
Log:
QPID-2102: Changed QueuePolicy to rely on external locking and require dequeues to be handled by policy user rather.
           (r817742 introduced a deadlock in ring queue policy which this checkin fixes)


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
    qpid/trunk/qpid/cpp/src/tests/ring_queue_test

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Sep 28 12:12:41 2009
@@ -208,11 +208,10 @@
 }
 
 void Queue::requeue(const QueuedMessage& msg){
-    if (!isEnqueued(msg)) return;
-
     QueueListeners::NotificationSet copy;
     {    
         Mutex::ScopedLock locker(messageLock);
+        if (!isEnqueued(msg)) return;
         msg.payload->enqueueComplete(); // mark the message as enqueued
         messages.push_front(msg);
         listeners.populate(copy);
@@ -603,7 +602,6 @@
             else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
         }
         if (policy.get()) {
-            Mutex::ScopedUnlock locker(messageLock);   
             policy->enqueued(qm);
         }
     }
@@ -696,7 +694,14 @@
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
 {
     if (policy.get() && !suppressPolicyCheck) {
-        policy->tryEnqueue(msg);
+        Messages dequeues;
+        {
+            Mutex::ScopedLock locker(messageLock);
+            policy->tryEnqueue(msg);
+            policy->getPendingDequeues(dequeues);
+        }
+        //depending on policy, may have some dequeues that need to performed without holding the lock
+        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));        
     }
 
     if (inLastNodeFailure && persistLastNode){
@@ -1072,10 +1077,4 @@
     return !policy.get() || policy->isEnqueued(msg);
 }
 
-void Queue::addPendingDequeue(const QueuedMessage& msg)
-{
-    //assumes lock is held - true at present but rather nasty as this is a public method
-    pendingDequeues.push_back(msg);    
-}
-
 QueueListeners& Queue::getListeners() { return listeners; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Sep 28 12:12:41 2009
@@ -334,18 +334,6 @@
              */
             void recoveryComplete();
 
-            /**
-             * This is a hack to avoid deadlocks in durable ring
-             * queues. It is used for dequeueing messages in response
-             * to an enqueue while avoid holding lock over call to
-             * store.
-             * 
-             * Assumes messageLock is held - true for curent use case
-             * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
-             * method
-             **/
-            void addPendingDequeue(const QueuedMessage &msg);
-
             // For cluster update
             QueueListeners& getListeners();
         };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Mon Sep 28 12:12:41 2009
@@ -77,7 +77,6 @@
 
 void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     if (checkLimit(m)) {
         enqueued(m->contentSize());
     } else {
@@ -87,13 +86,11 @@
 
 void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     enqueued(m->contentSize());
 }
 
 void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     dequeued(m->contentSize());
 }
 
@@ -101,7 +98,6 @@
 
 void QueuePolicy::dequeued(const QueuedMessage& m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     dequeued(m.payload->contentSize());
 }
 
@@ -141,6 +137,7 @@
     defaultMaxSize = s;
 }
 
+void QueuePolicy::getPendingDequeues(Messages&) {}
 
 
 
@@ -200,14 +197,12 @@
 
 void RingQueuePolicy::enqueued(const QueuedMessage& m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     //need to insert in correct location based on position
     queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
 }
 
 void RingQueuePolicy::dequeued(const QueuedMessage& m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     //find and remove m from queue
     if (find(m, pendingDequeues, true) || find(m, queue, true)) {
         //now update count and size
@@ -217,7 +212,6 @@
 
 bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
     //for non-strict ring policy, a message can be replaced (and
     //therefore dequeued) before it is accepted or released by
     //subscriber; need to detect this
@@ -241,8 +235,6 @@
         pendingDequeues.push_back(oldest);
         QPID_LOG(debug, "Ring policy triggered in " << name 
                  << ": removed message " << oldest.position << " to make way for new message");
-        qpid::sys::Mutex::ScopedUnlock u(lock);
-        oldest.queue->dequeue(0, oldest);
         return true;
     } else {
         QPID_LOG(debug, "Ring policy could not be triggered in " << name 
@@ -254,6 +246,11 @@
     }
 }
 
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+    result = pendingDequeues;
+}
+
 bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
 {
     for (Messages::iterator i = q.begin(); i != q.end(); i++) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Mon Sep 28 12:12:41 2009
@@ -47,6 +47,7 @@
     static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
 
   public:
+    typedef std::deque<QueuedMessage> Messages;
     static QPID_BROKER_EXTERN const std::string maxCountKey;
     static QPID_BROKER_EXTERN const std::string maxSizeKey;
     static QPID_BROKER_EXTERN const std::string typeKey;
@@ -68,7 +69,7 @@
     void encode(framing::Buffer& buffer) const;
     void decode ( framing::Buffer& buffer );
     uint32_t encodedSize() const;
-
+    virtual void getPendingDequeues(Messages& result);
 
     static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
     static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
@@ -80,7 +81,6 @@
                                                        const QueuePolicy&);
   protected:
     const std::string name;
-    qpid::sys::Mutex lock;
 
     QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
 
@@ -105,8 +105,8 @@
     void dequeued(const QueuedMessage&);
     bool isEnqueued(const QueuedMessage&);
     bool checkLimit(boost::intrusive_ptr<Message> msg);
+    void getPendingDequeues(Messages& result);
   private:
-    typedef std::deque<QueuedMessage> Messages;
     Messages pendingDequeues;
     Messages queue;
     const bool strict;

Modified: qpid/trunk/qpid/cpp/src/tests/ring_queue_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ring_queue_test?rev=819505&r1=819504&r2=819505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ring_queue_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/ring_queue_test Mon Sep 28 12:12:41 2009
@@ -48,7 +48,7 @@
 
 cleanup() {
     rm -f sender_${QUEUE_NAME}_* receiver_${QUEUE_NAME}_*
-    qpid-config $BROKER_URL add queue $QUEUE_NAME
+    qpid-config $BROKER_URL del queue $QUEUE_NAME --force
 }
 
 log() {
@@ -64,10 +64,11 @@
     if [[ $RECEIVERS -eq 0 ]]; then 
         #queue should have $LIMIT messages on it, but need to send an eos also
         sender --routing-key $QUEUE_NAME --send-eos 1 < /dev/null
-        if [[ $(receiver --queue $QUEUE_NAME --browse | wc -l) -eq $(( $LIMIT - 1)) ]]; then
+        received=$(receiver --queue $QUEUE_NAME --browse | wc -l)
+        if [[ received -eq $(( $LIMIT - 1)) ]]; then
             log "queue contains $LIMIT messages as expected"
         else
-            fail "queue does not contain the expected $LIMIT messages"
+            fail "queue does not contain the expected $LIMIT messages (received $received)"
         fi        
     elif [[ $CONCURRENT -eq 0 ]]; then
         #sum of length of all output files should be equal to $LIMIT - $RECEIVERS (1 eos message each)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org