You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/03/22 15:47:16 UTC

svn commit: r1303815 - in /qpid/trunk/qpid/cpp/src/qpid/broker: ./ Queue.cpp Queue.h QueueListeners.cpp QueueListeners.h

Author: kgiusti
Date: Thu Mar 22 14:47:15 2012
New Revision: 1303815

URL: http://svn.apache.org/viewvc?rev=1303815&view=rev
Log:
QPID-3890: merge Queue lock scope reduction performance tweaks into trunk

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/   (props changed)
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
  Merged /qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker:r1299027-1303795

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1303815&r1=1303814&r2=1303815&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Mar 22 14:47:15 2012
@@ -234,11 +234,16 @@ void Queue::deliver(boost::intrusive_ptr
 
 void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
 {
+    Mutex::ScopedLock locker(messageLock);
     if (policy.get()) policy->recoverEnqueued(msg);
 }
 
-void Queue::recover(boost::intrusive_ptr<Message>& msg){
-    if (policy.get()) policy->recoverEnqueued(msg);
+void Queue::recover(boost::intrusive_ptr<Message>& msg)
+{
+    {
+        Mutex::ScopedLock locker(messageLock);
+        if (policy.get()) policy->recoverEnqueued(msg);
+    }
 
     push(msg, true);
     if (store){
@@ -278,7 +283,6 @@ void Queue::requeue(const QueuedMessage&
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
     {
-        Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
         if (deleted) {
             //
@@ -296,8 +300,18 @@ void Queue::requeue(const QueuedMessage&
             }
             mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
         } else {
-            messages->release(msg);
-            listeners.populate(copy);
+            {
+                Mutex::ScopedLock locker(messageLock);
+                messages->release(msg);
+                observeRequeue(msg, locker);
+                listeners.populate(copy);
+            }
+
+            if (mgmtObject) {
+                mgmtObject->inc_releases();
+                if (brokerMgmtObject)
+                    brokerMgmtObject->inc_releases();
+            }
 
             // for persistLastNode - don't force a message twice to disk, but force it if no force before
             if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
@@ -307,7 +321,6 @@ void Queue::requeue(const QueuedMessage&
                     enqueue(0, payload);
                 }
             }
-            observeRequeue(msg, locker);
         }
     }
     copy.notify();
@@ -315,10 +328,9 @@ void Queue::requeue(const QueuedMessage&
 
 bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
 {
-    Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    if (acquire(position, message, locker)) {
+    if (acquire(position, message)) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         return true;
     } else {
@@ -329,17 +341,20 @@ bool Queue::acquireMessageAt(const Seque
 
 bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
 {
-    Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
-
-    if (!allocator->allocate( consumer, msg )) {
+    bool ok;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        ok = allocator->allocate( consumer, msg );
+    }
+    if (!ok) {
         QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
         return false;
     }
 
     QueuedMessage copy(msg);
-    if (acquire( msg.position, copy, locker)) {
+    if (acquire( msg.position, copy)) {
         QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
         return true;
     }
@@ -381,59 +396,73 @@ bool Queue::getNextMessage(QueuedMessage
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
 {
     while (true) {
-        Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
-        if (allocator->nextConsumableMessage(c, msg)) {
-            if (msg.payload->hasExpired()) {
-                QPID_LOG(debug, "Message expired from queue '" << name << "'");
-                c->setPosition(msg.position);
-                dequeue(0, msg);
-                if (mgmtObject) {
-                    mgmtObject->inc_discardsTtl();
-                    if (brokerMgmtObject)
-                        brokerMgmtObject->inc_discardsTtl();
-                }
+        bool found;
+        {
+            Mutex::ScopedLock locker(messageLock);
+            found = allocator->nextConsumableMessage(c, msg);
+            if (!found) listeners.addListener(c);
+        }
+        if (!found) {
+            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+            return NO_MESSAGES;
+        }
 
-                continue;
+        if (msg.payload->hasExpired()) {
+            QPID_LOG(debug, "Message expired from queue '" << name << "'");
+            c->setPosition(msg.position);
+            dequeue(0, msg);
+            if (mgmtObject) {
+                mgmtObject->inc_discardsTtl();
+                if (brokerMgmtObject)
+                    brokerMgmtObject->inc_discardsTtl();
             }
+            continue;
+        }
 
-            if (c->filter(msg.payload)) {
-                if (c->accept(msg.payload)) {
+        if (c->filter(msg.payload)) {
+            if (c->accept(msg.payload)) {
+                {
+                    Mutex::ScopedLock locker(messageLock);
                     bool ok = allocator->allocate( c->getName(), msg );  // inform allocator
                     (void) ok; assert(ok);
                     observeAcquire(msg, locker);
-                    m = msg;
-                    return CONSUMED;
-                } else {
-                    //message(s) are available but consumer hasn't got enough credit
-                    QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
-                    messages->release(msg);
-                    return CANT_CONSUME;
                 }
+                if (mgmtObject) {
+                    mgmtObject->inc_acquires();
+                    if (brokerMgmtObject)
+                        brokerMgmtObject->inc_acquires();
+                }
+                m = msg;
+                return CONSUMED;
             } else {
-                //consumer will never want this message
-                QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
-                messages->release(msg);
-                return CANT_CONSUME;
+                //message(s) are available but consumer hasn't got enough credit
+                QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
             }
         } else {
-            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
-            listeners.addListener(c);
-            return NO_MESSAGES;
+            //consumer will never want this message
+            QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
         }
+
+        Mutex::ScopedLock locker(messageLock);
+        messages->release(msg);
+        return CANT_CONSUME;
     }
 }
 
 bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
 {
     while (true) {
-        Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
-
-        if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
+        bool found;
+        {
+            Mutex::ScopedLock locker(messageLock);
+            found = allocator->nextBrowsableMessage(c, msg);
+            if (!found) listeners.addListener(c);
+        }
+        if (!found) { // no next available
             QPID_LOG(debug, "No browsable messages available for consumer " <<
                      c->getName() << " on queue '" << name << "'");
-            listeners.addListener(c);
             return false;
         }
 
@@ -491,7 +520,7 @@ bool Queue::find(SequenceNumber pos, Que
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
     assertClusterSafe();
     {
-        Mutex::ScopedLock locker(consumerLock);
+        Mutex::ScopedLock locker(messageLock);
         if(exclusive) {
             throw ResourceLockedException(
                                           QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
@@ -504,47 +533,43 @@ void Queue::consume(Consumer::shared_ptr
             }
         }
         consumerCount++;
-        if (mgmtObject != 0)
-            mgmtObject->inc_consumerCount ();
         //reset auto deletion timer if necessary
         if (autoDeleteTimeout && autoDeleteTask) {
             autoDeleteTask->cancel();
         }
+        observeConsumerAdd(*c, locker);
     }
-    Mutex::ScopedLock locker(messageLock);
-    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-        try{
-            (*i)->consumerAdded(*c);
-        } catch (const std::exception& e) {
-            QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
-        }
-    }
+    if (mgmtObject != 0)
+        mgmtObject->inc_consumerCount ();
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
     removeListener(c);
     {
-        Mutex::ScopedLock locker(consumerLock);
+        Mutex::ScopedLock locker(messageLock);
         consumerCount--;
         if(exclusive) exclusive = 0;
-        if (mgmtObject != 0)
-            mgmtObject->dec_consumerCount ();
-    }
-    Mutex::ScopedLock locker(messageLock);
-    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-        try{
-            (*i)->consumerRemoved(*c);
-        } catch (const std::exception& e) {
-            QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
-        }
+        observeConsumerRemove(*c, locker);
     }
+    if (mgmtObject != 0)
+        mgmtObject->dec_consumerCount ();
 }
 
 QueuedMessage Queue::get(){
-    Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
-    if (messages->consume(msg))
-        observeAcquire(msg, locker);
+    bool ok;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        ok = messages->consume(msg);
+        if (ok) observeAcquire(msg, locker);
+    }
+
+    if (ok && mgmtObject) {
+        mgmtObject->inc_acquires();
+        if (brokerMgmtObject)
+            brokerMgmtObject->inc_acquires();
+    }
+
     return msg;
 }
 
@@ -576,22 +601,26 @@ void Queue::purgeExpired(qpid::sys::Dura
             messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
         }
 
-        //
-        // Report the count of discarded-by-ttl messages
-        //
-        if (mgmtObject && !expired.empty()) {
-            mgmtObject->inc_discardsTtl(expired.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_discardsTtl(expired.size());
-        }
+        if (!expired.empty()) {
+            if (mgmtObject) {
+                mgmtObject->inc_acquires(expired.size());
+                mgmtObject->inc_discardsTtl(expired.size());
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_acquires(expired.size());
+                    brokerMgmtObject->inc_discardsTtl(expired.size());
+                }
+            }
 
-        for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
-             i != expired.end(); ++i) {
-            {
-                Mutex::ScopedLock locker(messageLock);
-                observeAcquire(*i, locker);
+            for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+                 i != expired.end(); ++i) {
+                {
+                    // KAG: should be safe to retake lock after the removeIf, since
+                    // no other thread can touch these messages after the removeIf() call
+                    Mutex::ScopedLock locker(messageLock);
+                    observeAcquire(*i, locker);
+                }
+                dequeue( 0, *i );
             }
-            dequeue( 0, *i );
         }
     }
 }
@@ -717,32 +746,46 @@ uint32_t Queue::purge(const uint32_t pur
     std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
     Collector c(*mf.get(), purge_request);
 
-    Mutex::ScopedLock locker(messageLock);
-    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    {
+        Mutex::ScopedLock locker(messageLock);
+        messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    }
 
-    if (mgmtObject && !c.matches.empty()) {
-        if (dest.get()) {
-            mgmtObject->inc_reroutes(c.matches.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_reroutes(c.matches.size());
-        } else {
-            mgmtObject->inc_discardsPurge(c.matches.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_discardsPurge(c.matches.size());
+    if (!c.matches.empty()) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires(c.matches.size());
+            if (dest.get()) {
+                mgmtObject->inc_reroutes(c.matches.size());
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_acquires(c.matches.size());
+                    brokerMgmtObject->inc_reroutes(c.matches.size());
+                }
+            } else {
+                mgmtObject->inc_discardsPurge(c.matches.size());
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_acquires(c.matches.size());
+                    brokerMgmtObject->inc_discardsPurge(c.matches.size());
+                }
+            }
         }
-    }
 
-    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-         qmsg != c.matches.end(); ++qmsg) {
-        // Update observers and message state:
-        observeAcquire(*qmsg, locker);
-        dequeue(0, *qmsg);
-        QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
-        // now reroute if necessary
-        if (dest.get()) {
-            assert(qmsg->payload);
-            DeliverableMessage dmsg(qmsg->payload);
-            dest->routeWithAlternate(dmsg);
+        for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+             qmsg != c.matches.end(); ++qmsg) {
+
+            {
+                // KAG: should be safe to retake lock after the removeIf, since
+                // no other thread can touch these messages after the removeIf call
+                Mutex::ScopedLock locker(messageLock);
+                observeAcquire(*qmsg, locker);
+            }
+            dequeue(0, *qmsg);
+            QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
+            // now reroute if necessary
+            if (dest.get()) {
+                assert(qmsg->payload);
+                DeliverableMessage dmsg(qmsg->payload);
+                dest->routeWithAlternate(dmsg);
+            }
         }
     }
     return c.matches.size();
@@ -754,27 +797,51 @@ uint32_t Queue::move(const Queue::shared
     std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
     Collector c(*mf.get(), qty);
 
-    Mutex::ScopedLock locker(messageLock);
-    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    {
+        Mutex::ScopedLock locker(messageLock);
+        messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    }
+
 
-    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-         qmsg != c.matches.end(); ++qmsg) {
+    if (!c.matches.empty()) {
         // Update observers and message state:
-        observeAcquire(*qmsg, locker);
-        dequeue(0, *qmsg);
-        // and move to destination Queue.
-        assert(qmsg->payload);
-        destq->deliver(qmsg->payload);
+
+        if (mgmtObject) {
+            mgmtObject->inc_acquires(c.matches.size());
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires(c.matches.size());
+        }
+
+        for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+             qmsg != c.matches.end(); ++qmsg) {
+            {
+                Mutex::ScopedLock locker(messageLock);
+                observeAcquire(*qmsg, locker);
+            }
+            dequeue(0, *qmsg);
+            // and move to destination Queue.
+            assert(qmsg->payload);
+            destq->deliver(qmsg->payload);
+        }
     }
     return c.matches.size();
 }
 
 /** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
-                    const Mutex::ScopedLock& locker)
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg)
 {
-    if (messages->acquire(position, msg)) {
-        observeAcquire(msg, locker);
+    bool ok;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        ok = messages->acquire(position, msg);
+        if (ok) observeAcquire(msg, locker);
+    }
+    if (ok) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires();
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires();
+        }
         ++dequeueSincePurge;
         return true;
     }
@@ -784,35 +851,43 @@ bool Queue::acquire(const qpid::framing:
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
-    QueuedMessage removed;
+    QueuedMessage removed, qm(this, msg);
     bool dequeueRequired = false;
     {
         Mutex::ScopedLock locker(messageLock);
-        QueuedMessage qm(this, msg, ++sequence);
-        if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
-
-        dequeueRequired = messages->push(qm, removed);
-        if (dequeueRequired) {
+        qm.position = ++sequence;
+        if (messages->push(qm, removed)) {
+            dequeueRequired = true;
             observeAcquire(removed, locker);
-            if (mgmtObject) {
-                mgmtObject->inc_discardsLvq();
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_discardsLvq();
-            }
         }
-        listeners.populate(copy);
         observeEnqueue(qm, locker);
+        if (policy.get()) {
+            policy->enqueued(qm);
+        }
+        listeners.populate(copy);
     }
-    copy.notify();
+    if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position);
+
+    mgntEnqStats(msg, mgmtObject, brokerMgmtObject);
+
     if (dequeueRequired) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires();
+            mgmtObject->inc_discardsLvq();
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires();
+                brokerMgmtObject->inc_discardsLvq();
+        }
         if (isRecovery) {
             //can't issue new requests for the store until
             //recovery is complete
+            Mutex::ScopedLock locker(messageLock);
             pendingDequeues.push_back(removed);
         } else {
             dequeue(0, removed);
         }
     }
+    copy.notify();
 }
 
 void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
@@ -823,8 +898,8 @@ void isEnqueueComplete(uint32_t* result,
 /** function only provided for unit tests, or code not in critical message path */
 uint32_t Queue::getEnqueueCompleteMessageCount() const
 {
-    Mutex::ScopedLock locker(messageLock);
     uint32_t count = 0;
+    Mutex::ScopedLock locker(messageLock);
     messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
     return count;
 }
@@ -837,13 +912,13 @@ uint32_t Queue::getMessageCount() const
 
 uint32_t Queue::getConsumerCount() const
 {
-    Mutex::ScopedLock locker(consumerLock);
+    Mutex::ScopedLock locker(messageLock);
     return consumerCount;
 }
 
 bool Queue::canAutoDelete() const
 {
-    Mutex::ScopedLock locker(consumerLock);
+    Mutex::ScopedLock locker(messageLock);
     return autodelete && !consumerCount && !owner;
 }
 
@@ -950,14 +1025,20 @@ bool Queue::dequeue(TransactionContext* 
 {
     ScopedUse u(barrier);
     if (!u.acquired) return false;
-
     {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return false;
         if (!ctxt) {
+            if (policy.get()) policy->dequeued(msg);
+            messages->deleted(msg);
             observeDequeue(msg, locker);
         }
     }
+
+    if (!ctxt) {
+        mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
+    }
+
     // This check prevents messages which have been forced persistent on one queue from dequeuing
     // from another on which no forcing has taken place and thus causing a store error.
     bool fp = msg.payload->isForcedPersistent();
@@ -974,8 +1055,13 @@ bool Queue::dequeue(TransactionContext* 
 
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
-    Mutex::ScopedLock locker(messageLock);
-    observeDequeue(msg, locker);
+    {
+        Mutex::ScopedLock locker(messageLock);
+        if (policy.get()) policy->dequeued(msg);
+        messages->deleted(msg);
+        observeDequeue(msg, locker);
+    }
+    mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
     if (mgmtObject != 0) {
         _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
         const uint64_t contentSize = msg.payload->contentSize();
@@ -995,10 +1081,20 @@ void Queue::dequeueCommitted(const Queue
  * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
  * it from the logical (and persistent if applicable) queue
  */
-bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
+bool Queue::popAndDequeue(QueuedMessage& msg)
 {
-    if (messages->consume(msg)) {
-        observeAcquire(msg, locker);
+    bool popped;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        popped = messages->consume(msg);
+        if (popped) observeAcquire(msg, locker);
+    }
+    if (popped) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires();
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires();
+        }
         dequeue(0, msg);
         return true;
     } else {
@@ -1008,13 +1104,10 @@ bool Queue::popAndDequeue(QueuedMessage&
 
 /**
  * Updates policy and management when a message has been dequeued,
- * expects messageLock to be held
+ * Requires messageLock be held by caller.
  */
-void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
 {
-    mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
-    if (policy.get()) policy->dequeued(msg);
-    messages->deleted(msg);
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
             (*i)->dequeued(msg);
@@ -1024,17 +1117,11 @@ void Queue::observeDequeue(const QueuedM
     }
 }
 
-/** updates queue observers when a message has become unavailable for transfer,
- * expects messageLock to be held
+/** updates queue observers when a message has become unavailable for transfer.
+ * Requires messageLock be held by caller.
  */
-void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
 {
-    if (mgmtObject) {
-        mgmtObject->inc_acquires();
-        if (brokerMgmtObject)
-            brokerMgmtObject->inc_acquires();
-    }
-
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
             (*i)->acquired(msg);
@@ -1044,17 +1131,11 @@ void Queue::observeAcquire(const QueuedM
     }
 }
 
-/** updates queue observers when a message has become re-available for transfer,
- * expects messageLock to be held
+/** updates queue observers when a message has become re-available for transfer
+ *  Requires messageLock be held by caller.
  */
-void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
 {
-    if (mgmtObject) {
-        mgmtObject->inc_releases();
-        if (brokerMgmtObject)
-            brokerMgmtObject->inc_releases();
-    }
-
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
             (*i)->requeued(msg);
@@ -1064,6 +1145,33 @@ void Queue::observeRequeue(const QueuedM
     }
 }
 
+/** updates queue observers when a new consumer has subscribed to this queue.
+ */
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+{
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->consumerAdded(c);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+        }
+    }
+}
+
+/** updates queue observers when a consumer has unsubscribed from this queue.
+ */
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+{
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->consumerRemoved(c);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+        }
+    }
+}
+
+
 void Queue::create(const FieldTable& _settings)
 {
     settings = _settings;
@@ -1211,23 +1319,21 @@ void Queue::configureImpl(const FieldTab
 void Queue::destroyed()
 {
     unbind(broker->getExchanges());
-    {
-        Mutex::ScopedLock locker(messageLock);
-        QueuedMessage m;
-        while(popAndDequeue(m, locker)) {
-            DeliverableMessage msg(m.payload);
-            if (alternateExchange.get()) {
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_abandonedViaAlt();
-                alternateExchange->routeWithAlternate(msg);
-            } else {
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_abandoned();
-            }
+
+    QueuedMessage m;
+    while(popAndDequeue(m)) {
+        DeliverableMessage msg(m.payload);
+        if (alternateExchange.get()) {
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_abandonedViaAlt();
+            alternateExchange->routeWithAlternate(msg);
+        } else {
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_abandoned();
         }
-        if (alternateExchange.get())
-            alternateExchange->decAlternateUsers();
     }
+    if (alternateExchange.get())
+        alternateExchange->decAlternateUsers();
 
     if (store) {
         barrier.destroy();
@@ -1238,7 +1344,7 @@ void Queue::destroyed()
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
     notifyDeleted();
     {
-        Mutex::ScopedLock locker(messageLock);
+        Mutex::ScopedLock lock(messageLock);
         observers.clear();
     }
 }
@@ -1248,8 +1354,8 @@ void Queue::notifyDeleted()
     QueueListeners::ListenerSet set;
     {
         Mutex::ScopedLock locker(messageLock);
-        listeners.snapshot(set);
         deleted = true;
+        listeners.snapshot(set);
     }
     set.notifyAll();
 }
@@ -1267,6 +1373,7 @@ void Queue::unbind(ExchangeRegistry& exc
 
 void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
 {
+    Mutex::ScopedLock locker(messageLock);
     policy = _policy;
     if (policy.get())
         policy->setQueue(this);
@@ -1274,6 +1381,7 @@ void Queue::setPolicy(std::auto_ptr<Queu
 
 const QueuePolicy* Queue::getPolicy()
 {
+    Mutex::ScopedLock locker(messageLock);
     return policy.get();
 }
 
@@ -1555,8 +1663,12 @@ void Queue::recoveryComplete(ExchangeReg
                       << "\": exchange does not exist.");
     }
     //process any pending dequeues
-    for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
-    pendingDequeues.clear();
+    std::deque<QueuedMessage> pd;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        pendingDequeues.swap(pd);
+    }
+    for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
 }
 
 void Queue::insertSequenceNumbers(const std::string& key)
@@ -1566,10 +1678,10 @@ void Queue::insertSequenceNumbers(const 
     QPID_LOG(debug, "Inserting sequence numbers as " << key);
 }
 
-/** updates queue observers and state when a message has become available for transfer,
- * expects messageLock to be held
+/** updates queue observers and state when a message has become available for transfer
+ *  Requires messageLock be held by caller.
  */
-void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
+void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
 {
     for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
         try {
@@ -1578,10 +1690,6 @@ void Queue::observeEnqueue(const QueuedM
             QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
         }
     }
-    if (policy.get()) {
-        policy->enqueued(m);
-    }
-    mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
 }
 
 void Queue::updateEnqueued(const QueuedMessage& m)
@@ -1589,12 +1697,16 @@ void Queue::updateEnqueued(const QueuedM
     if (m.payload) {
         boost::intrusive_ptr<Message> payload = m.payload;
         enqueue(0, payload, true);
-        messages->updateAcquired(m);
-        if (policy.get()) {
-            policy->recoverEnqueued(payload);
+        {
+            Mutex::ScopedLock locker(messageLock);
+            messages->updateAcquired(m);
+            observeEnqueue(m, locker);
+            if (policy.get()) {
+                policy->recoverEnqueued(payload);
+                policy->enqueued(m);
+            }
         }
-        Mutex::ScopedLock locker(messageLock);
-        observeEnqueue(m, locker);
+        mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
     } else {
         QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
     }
@@ -1602,10 +1714,16 @@ void Queue::updateEnqueued(const QueuedM
 
 bool Queue::isEnqueued(const QueuedMessage& msg)
 {
+    Mutex::ScopedLock locker(messageLock);
     return !policy.get() || policy->isEnqueued(msg);
 }
 
+// Note: accessing listeners outside of lock is dangerous.  Caller must ensure the queue's
+// state is not changed while listeners is referenced.
 QueueListeners& Queue::getListeners() { return listeners; }
+
+// Note: accessing messages outside of lock is dangerous.  Caller must ensure the queue's
+// state is not changed while messages is referenced.
 Messages& Queue::getMessages() { return *messages; }
 const Messages& Queue::getMessages() const { return *messages; }
 
@@ -1618,13 +1736,13 @@ void Queue::checkNotDeleted(const Consum
 
 void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
 {
-    Mutex::ScopedLock locker(messageLock);
+    Mutex::ScopedLock lock(messageLock);
     observers.insert(observer);
 }
 
 void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
 {
-    Mutex::ScopedLock locker(messageLock);
+    Mutex::ScopedLock lock(messageLock);
     observers.erase(observer);
 }
 
@@ -1687,7 +1805,7 @@ Queue::UsageBarrier::UsageBarrier(Queue&
 
 bool Queue::UsageBarrier::acquire()
 {
-    Monitor::ScopedLock l(parent.messageLock);
+    Monitor::ScopedLock l(parent.messageLock);  /** @todo: use a dedicated lock instead of messageLock */
     if (parent.deleted) {
         return false;
     } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1303815&r1=1303814&r2=1303815&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Mar 22 14:47:15 2012
@@ -107,7 +107,22 @@ class Queue : public boost::enable_share
     QueueListeners listeners;
     std::auto_ptr<Messages> messages;
     std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
-    mutable qpid::sys::Mutex consumerLock;
+    /** messageLock is used to keep the Queue's state consistent while processing message
+     * events, such as message dispatch, enqueue, acquire, and dequeue.  It must be held
+     * while updating certain members in order to keep these members consistent with
+     * each other:
+     *     o  messages
+     *     o  sequence
+     *     o  policy
+     *     o  listeners
+     *     o  allocator
+     *     o  observeXXX() methods
+     *     o  observers
+     *     o  pendingDequeues  (TBD: move under separate lock)
+     *     o  exclusive OwnershipToken (TBD: move under separate lock)
+     *     o  consumerCount  (TBD: move under separate lock)
+     *     o  Queue::UsageBarrier (TBD: move under separate lock)
+     */
     mutable qpid::sys::Monitor messageLock;
     mutable qpid::sys::Mutex ownershipLock;
     mutable uint64_t persistenceId;
@@ -143,17 +158,17 @@ class Queue : public boost::enable_share
 
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
-    /** update queue observers, stats, policy, etc when the messages' state changes. Lock
-     * must be held by caller */
+    /** update queue observers, stats, policy, etc when the messages' state changes.
+     * messageLock is held by caller */
     void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
     void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
     void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
     void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    bool popAndDequeue(QueuedMessage&, const sys::Mutex::ScopedLock& lock);
-    // acquire message @ position, return true and set msg if acquire succeeds
-    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
-                 const sys::Mutex::ScopedLock& held);
+    void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
+    void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
 
+    bool popAndDequeue(QueuedMessage&);
+    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
     void forcePersistent(QueuedMessage& msg);
     int getEventMode();
     void configureImpl(const qpid::framing::FieldTable& settings);
@@ -355,6 +370,7 @@ class Queue : public boost::enable_share
 
     /** Apply f to each Observer on the queue */
     template <class F> void eachObserver(F f) {
+        sys::Mutex::ScopedLock l(messageLock);
         std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp?rev=1303815&r1=1303814&r2=1303815&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp Thu Mar 22 14:47:15 2012
@@ -79,10 +79,6 @@ void QueueListeners::NotificationSet::no
     std::for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify));
 }
 
-bool QueueListeners::contains(Consumer::shared_ptr c) const {
-    return c->inListeners;
-}
-
 void QueueListeners::ListenerSet::notifyAll()
 {
     std::for_each(listeners.begin(), listeners.end(), boost::mem_fn(&Consumer::notify));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h?rev=1303815&r1=1303814&r2=1303815&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h Thu Mar 22 14:47:15 2012
@@ -30,7 +30,7 @@ namespace broker {
 /**
  * Track and notify components that wish to be notified of messages
  * that become available on a queue.
- * 
+ *
  * None of the methods defined here are protected by locking. However
  * the populate method allows a 'snapshot' to be taken of the
  * listeners to be notified. NotificationSet::notify() may then be
@@ -61,11 +61,10 @@ class QueueListeners
       friend class QueueListeners;
     };
 
-    void addListener(Consumer::shared_ptr);    
-    void removeListener(Consumer::shared_ptr);    
+    void addListener(Consumer::shared_ptr);
+    void removeListener(Consumer::shared_ptr);
     void populate(NotificationSet&);
     void snapshot(ListenerSet&);
-    bool contains(Consumer::shared_ptr c) const;
     void notifyAll();
 
     template <class F> void eachListener(F f) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org