You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/03/13 21:08:33 UTC

svn commit: r1300327 - in /qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h QueueListeners.cpp QueueListeners.h

Author: kgiusti
Date: Tue Mar 13 20:08:33 2012
New Revision: 1300327

URL: http://svn.apache.org/viewvc?rev=1300327&view=rev
Log:
Merge branch 'perf-locks' into qpid-3890

Modified:
    qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.cpp
    qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.h

Modified: qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1300327&r1=1300326&r2=1300327&view=diff
==============================================================================
--- qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp Tue Mar 13 20:08:33 2012
@@ -232,11 +232,16 @@ void Queue::deliver(boost::intrusive_ptr
 
 void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
 {
+    Mutex::ScopedLock locker(messageLock);
     if (policy.get()) policy->recoverEnqueued(msg);
 }
 
-void Queue::recover(boost::intrusive_ptr<Message>& msg){
-    if (policy.get()) policy->recoverEnqueued(msg);
+void Queue::recover(boost::intrusive_ptr<Message>& msg)
+{
+    {
+        Mutex::ScopedLock locker(messageLock);
+        if (policy.get()) policy->recoverEnqueued(msg);
+    }
 
     push(msg, true);
     if (store){
@@ -276,7 +281,6 @@ void Queue::requeue(const QueuedMessage&
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
     {
-        Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
         if (deleted) {
             //
@@ -294,8 +298,18 @@ void Queue::requeue(const QueuedMessage&
             }
             mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
         } else {
-            messages->release(msg);
-            listeners.populate(copy);
+            {
+                Mutex::ScopedLock locker(messageLock);
+                messages->release(msg);
+                observeRequeueLH(msg);
+                listeners.populate(copy);
+            }
+
+            if (mgmtObject) {
+                mgmtObject->inc_releases();
+                if (brokerMgmtObject)
+                    brokerMgmtObject->inc_releases();
+            }
 
             // for persistLastNode - don't force a message twice to disk, but force it if no force before
             if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
@@ -305,7 +319,6 @@ void Queue::requeue(const QueuedMessage&
                     enqueue(0, payload);
                 }
             }
-            observeRequeue(msg, locker);
         }
     }
     copy.notify();
@@ -313,10 +326,9 @@ void Queue::requeue(const QueuedMessage&
 
 bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
 {
-    Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    if (acquire(position, message, locker)) {
+    if (acquire(position, message)) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         return true;
     } else {
@@ -327,17 +339,20 @@ bool Queue::acquireMessageAt(const Seque
 
 bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
 {
-    Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
-
-    if (!allocator->allocate( consumer, msg )) {
+    bool ok;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        ok = allocator->allocate( consumer, msg );
+    }
+    if (!ok) {
         QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
         return false;
     }
 
     QueuedMessage copy(msg);
-    if (acquire( msg.position, copy, locker)) {
+    if (acquire( msg.position, copy)) {
         QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
         return true;
     }
@@ -379,59 +394,73 @@ bool Queue::getNextMessage(QueuedMessage
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
 {
     while (true) {
-        Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
-        if (allocator->nextConsumableMessage(c, msg)) {
-            if (msg.payload->hasExpired()) {
-                QPID_LOG(debug, "Message expired from queue '" << name << "'");
-                c->setPosition(msg.position);
-                dequeue(0, msg);
-                if (mgmtObject) {
-                    mgmtObject->inc_discardsTtl();
-                    if (brokerMgmtObject)
-                        brokerMgmtObject->inc_discardsTtl();
-                }
+        bool found;
+        {
+            Mutex::ScopedLock locker(messageLock);
+            found = allocator->nextConsumableMessage(c, msg);
+            if (!found) listeners.addListener(c);
+        }
+        if (!found) {
+            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+            return NO_MESSAGES;
+        }
 
-                continue;
+        if (msg.payload->hasExpired()) {
+            QPID_LOG(debug, "Message expired from queue '" << name << "'");
+            c->setPosition(msg.position);
+            dequeue(0, msg);
+            if (mgmtObject) {
+                mgmtObject->inc_discardsTtl();
+                if (brokerMgmtObject)
+                    brokerMgmtObject->inc_discardsTtl();
             }
+            continue;
+        }
 
-            if (c->filter(msg.payload)) {
-                if (c->accept(msg.payload)) {
+        if (c->filter(msg.payload)) {
+            if (c->accept(msg.payload)) {
+                {
+                    Mutex::ScopedLock locker(messageLock);
                     bool ok = allocator->allocate( c->getName(), msg );  // inform allocator
                     (void) ok; assert(ok);
-                    observeAcquire(msg, locker);
-                    m = msg;
-                    return CONSUMED;
-                } else {
-                    //message(s) are available but consumer hasn't got enough credit
-                    QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
-                    messages->release(msg);
-                    return CANT_CONSUME;
+                    observeAcquireLH(msg);
                 }
+                if (mgmtObject) {
+                    mgmtObject->inc_acquires();
+                    if (brokerMgmtObject)
+                        brokerMgmtObject->inc_acquires();
+                }
+                m = msg;
+                return CONSUMED;
             } else {
-                //consumer will never want this message
-                QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
-                messages->release(msg);
-                return CANT_CONSUME;
+                //message(s) are available but consumer hasn't got enough credit
+                QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
             }
         } else {
-            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
-            listeners.addListener(c);
-            return NO_MESSAGES;
+            //consumer will never want this message
+            QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
         }
+
+        Mutex::ScopedLock locker(messageLock);
+        messages->release(msg);
+        return CANT_CONSUME;
     }
 }
 
 bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
 {
     while (true) {
-        Mutex::ScopedLock locker(messageLock);
         QueuedMessage msg;
-
-        if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
+        bool found;
+        {
+            Mutex::ScopedLock locker(messageLock);
+            found = allocator->nextBrowsableMessage(c, msg);
+            if (!found) listeners.addListener(c);
+        }
+        if (!found) { // no next available
             QPID_LOG(debug, "No browsable messages available for consumer " <<
                      c->getName() << " on queue '" << name << "'");
-            listeners.addListener(c);
             return false;
         }
 
@@ -489,7 +518,7 @@ bool Queue::find(SequenceNumber pos, Que
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
     assertClusterSafe();
     {
-        Mutex::ScopedLock locker(consumerLock);
+        Mutex::ScopedLock locker(messageLock);
         if(exclusive) {
             throw ResourceLockedException(
                                           QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
@@ -502,47 +531,43 @@ void Queue::consume(Consumer::shared_ptr
             }
         }
         consumerCount++;
-        if (mgmtObject != 0)
-            mgmtObject->inc_consumerCount ();
         //reset auto deletion timer if necessary
         if (autoDeleteTimeout && autoDeleteTask) {
             autoDeleteTask->cancel();
         }
+        observeConsumerAddLH(*c);
     }
-    Mutex::ScopedLock locker(messageLock);
-    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-        try{
-            (*i)->consumerAdded(*c);
-        } catch (const std::exception& e) {
-            QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
-        }
-    }
+    if (mgmtObject != 0)
+        mgmtObject->inc_consumerCount ();
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
     removeListener(c);
     {
-        Mutex::ScopedLock locker(consumerLock);
+        Mutex::ScopedLock locker(messageLock);
         consumerCount--;
         if(exclusive) exclusive = 0;
-        if (mgmtObject != 0)
-            mgmtObject->dec_consumerCount ();
-    }
-    Mutex::ScopedLock locker(messageLock);
-    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-        try{
-            (*i)->consumerRemoved(*c);
-        } catch (const std::exception& e) {
-            QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
-        }
+        observeConsumerRemoveLH(*c);
     }
+    if (mgmtObject != 0)
+        mgmtObject->dec_consumerCount ();
 }
 
 QueuedMessage Queue::get(){
-    Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
-    if (messages->consume(msg))
-        observeAcquire(msg, locker);
+    bool ok;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        ok = messages->consume(msg);
+        if (ok) observeAcquireLH(msg);
+    }
+
+    if (ok && mgmtObject) {
+        mgmtObject->inc_acquires();
+        if (brokerMgmtObject)
+            brokerMgmtObject->inc_acquires();
+    }
+
     return msg;
 }
 
@@ -574,22 +599,26 @@ void Queue::purgeExpired(qpid::sys::Dura
             messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
         }
 
-        //
-        // Report the count of discarded-by-ttl messages
-        //
-        if (mgmtObject && !expired.empty()) {
-            mgmtObject->inc_discardsTtl(expired.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_discardsTtl(expired.size());
-        }
+        if (!expired.empty()) {
+            if (mgmtObject) {
+                mgmtObject->inc_acquires(expired.size());
+                mgmtObject->inc_discardsTtl(expired.size());
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_acquires(expired.size());
+                    brokerMgmtObject->inc_discardsTtl(expired.size());
+                }
+            }
 
-        for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
-             i != expired.end(); ++i) {
-            {
-                Mutex::ScopedLock locker(messageLock);
-                observeAcquire(*i, locker);
+            for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+                 i != expired.end(); ++i) {
+                {
+                    // KAG: should be safe to retake lock after the removeIf, since
+                    // no other thread can touch these messages after the removeIf() call
+                    Mutex::ScopedLock locker(messageLock);
+                    observeAcquireLH(*i);
+                }
+                dequeue( 0, *i );
             }
-            dequeue( 0, *i );
         }
     }
 }
@@ -715,32 +744,46 @@ uint32_t Queue::purge(const uint32_t pur
     std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
     Collector c(*mf.get(), purge_request);
 
-    Mutex::ScopedLock locker(messageLock);
-    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    {
+        Mutex::ScopedLock locker(messageLock);
+        messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    }
 
-    if (mgmtObject && !c.matches.empty()) {
-        if (dest.get()) {
-            mgmtObject->inc_reroutes(c.matches.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_reroutes(c.matches.size());
-        } else {
-            mgmtObject->inc_discardsPurge(c.matches.size());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_discardsPurge(c.matches.size());
+    if (!c.matches.empty()) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires(c.matches.size());
+            if (dest.get()) {
+                mgmtObject->inc_reroutes(c.matches.size());
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_acquires(c.matches.size());
+                    brokerMgmtObject->inc_reroutes(c.matches.size());
+                }
+            } else {
+                mgmtObject->inc_discardsPurge(c.matches.size());
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_acquires(c.matches.size());
+                    brokerMgmtObject->inc_discardsPurge(c.matches.size());
+                }
+            }
         }
-    }
 
-    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-         qmsg != c.matches.end(); ++qmsg) {
-        // Update observers and message state:
-        observeAcquire(*qmsg, locker);
-        dequeue(0, *qmsg);
-        QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
-        // now reroute if necessary
-        if (dest.get()) {
-            assert(qmsg->payload);
-            DeliverableMessage dmsg(qmsg->payload);
-            dest->routeWithAlternate(dmsg);
+        for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+             qmsg != c.matches.end(); ++qmsg) {
+
+            {
+                // KAG: should be safe to retake lock after the removeIf, since
+                // no other thread can touch these messages after the removeIf call
+                Mutex::ScopedLock locker(messageLock);
+                observeAcquireLH(*qmsg);
+            }
+            dequeue(0, *qmsg);
+            QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
+            // now reroute if necessary
+            if (dest.get()) {
+                assert(qmsg->payload);
+                DeliverableMessage dmsg(qmsg->payload);
+                dest->routeWithAlternate(dmsg);
+            }
         }
     }
     return c.matches.size();
@@ -752,27 +795,51 @@ uint32_t Queue::move(const Queue::shared
     std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
     Collector c(*mf.get(), qty);
 
-    Mutex::ScopedLock locker(messageLock);
-    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    {
+        Mutex::ScopedLock locker(messageLock);
+        messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+    }
+
 
-    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-         qmsg != c.matches.end(); ++qmsg) {
+    if (!c.matches.empty()) {
         // Update observers and message state:
-        observeAcquire(*qmsg, locker);
-        dequeue(0, *qmsg);
-        // and move to destination Queue.
-        assert(qmsg->payload);
-        destq->deliver(qmsg->payload);
+
+        if (mgmtObject) {
+            mgmtObject->inc_acquires(c.matches.size());
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires(c.matches.size());
+        }
+
+        for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+             qmsg != c.matches.end(); ++qmsg) {
+            {
+                Mutex::ScopedLock locker(messageLock);
+                observeAcquireLH(*qmsg);
+            }
+            dequeue(0, *qmsg);
+            // and move to destination Queue.
+            assert(qmsg->payload);
+            destq->deliver(qmsg->payload);
+        }
     }
     return c.matches.size();
 }
 
 /** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
-                    const Mutex::ScopedLock& locker)
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg)
 {
-    if (messages->acquire(position, msg)) {
-        observeAcquire(msg, locker);
+    bool ok;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        ok = messages->acquire(position, msg);
+        if (ok) observeAcquireLH(msg);
+    }
+    if (ok) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires();
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires();
+        }
         ++dequeueSincePurge;
         return true;
     }
@@ -782,35 +849,43 @@ bool Queue::acquire(const qpid::framing:
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
-    QueuedMessage removed;
+    QueuedMessage removed, qm(this, msg);
     bool dequeueRequired = false;
     {
         Mutex::ScopedLock locker(messageLock);
-        QueuedMessage qm(this, msg, ++sequence);
-        if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
-
-        dequeueRequired = messages->push(qm, removed);
-        if (dequeueRequired) {
-            observeAcquire(removed, locker);
-            if (mgmtObject) {
-                mgmtObject->inc_discardsLvq();
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_discardsLvq();
-            }
+        qm.position = ++sequence;
+        if (messages->push(qm, removed)) {
+            dequeueRequired = true;
+            observeAcquireLH(removed);
+        }
+        observeEnqueueLH(qm);
+        if (policy.get()) {
+            policy->enqueued(qm);
         }
         listeners.populate(copy);
-        observeEnqueue(qm, locker);
     }
-    copy.notify();
+    if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position);
+
+    mgntEnqStats(msg, mgmtObject, brokerMgmtObject);
+
     if (dequeueRequired) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires();
+            mgmtObject->inc_discardsLvq();
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires();
+                brokerMgmtObject->inc_discardsLvq();
+        }
         if (isRecovery) {
             //can't issue new requests for the store until
             //recovery is complete
+            Mutex::ScopedLock locker(messageLock);
             pendingDequeues.push_back(removed);
         } else {
             dequeue(0, removed);
         }
     }
+    copy.notify();
 }
 
 void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
@@ -821,8 +896,8 @@ void isEnqueueComplete(uint32_t* result,
 /** function only provided for unit tests, or code not in critical message path */
 uint32_t Queue::getEnqueueCompleteMessageCount() const
 {
-    Mutex::ScopedLock locker(messageLock);
     uint32_t count = 0;
+    Mutex::ScopedLock locker(messageLock);
     messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
     return count;
 }
@@ -835,13 +910,13 @@ uint32_t Queue::getMessageCount() const
 
 uint32_t Queue::getConsumerCount() const
 {
-    Mutex::ScopedLock locker(consumerLock);
+    Mutex::ScopedLock locker(messageLock);
     return consumerCount;
 }
 
 bool Queue::canAutoDelete() const
 {
-    Mutex::ScopedLock locker(consumerLock);
+    Mutex::ScopedLock locker(messageLock);
     return autodelete && !consumerCount && !owner;
 }
 
@@ -948,14 +1023,18 @@ bool Queue::dequeue(TransactionContext* 
 {
     ScopedUse u(barrier);
     if (!u.acquired) return false;
-
     {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return false;
         if (!ctxt) {
-            observeDequeue(msg, locker);
+            if (policy.get()) policy->dequeued(msg);
+            messages->deleted(msg);
+            observeDequeueLH(msg);
         }
     }
+
+    mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
+
     // This check prevents messages which have been forced persistent on one queue from dequeuing
     // from another on which no forcing has taken place and thus causing a store error.
     bool fp = msg.payload->isForcedPersistent();
@@ -972,8 +1051,13 @@ bool Queue::dequeue(TransactionContext* 
 
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
-    Mutex::ScopedLock locker(messageLock);
-    observeDequeue(msg, locker);
+    {
+        Mutex::ScopedLock locker(messageLock);
+        if (policy.get()) policy->dequeued(msg);
+        messages->deleted(msg);
+        observeDequeueLH(msg);
+    }
+    mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
     if (mgmtObject != 0) {
         _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
         const uint64_t contentSize = msg.payload->contentSize();
@@ -993,10 +1077,20 @@ void Queue::dequeueCommitted(const Queue
  * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
  * it from the logical (and persistent if applicable) queue
  */
-bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
+bool Queue::popAndDequeue(QueuedMessage& msg)
 {
-    if (messages->consume(msg)) {
-        observeAcquire(msg, locker);
+    bool popped;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        popped = messages->consume(msg);
+        if (popped) observeAcquireLH(msg);
+    }
+    if (popped) {
+        if (mgmtObject) {
+            mgmtObject->inc_acquires();
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_acquires();
+        }
         dequeue(0, msg);
         return true;
     } else {
@@ -1006,13 +1100,9 @@ bool Queue::popAndDequeue(QueuedMessage&
 
 /**
  * Updates policy and management when a message has been dequeued,
- * expects messageLock to be held
  */
-void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeDequeueLH(const QueuedMessage& msg)
 {
-    mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
-    if (policy.get()) policy->dequeued(msg);
-    messages->deleted(msg);
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
             (*i)->dequeued(msg);
@@ -1022,17 +1112,10 @@ void Queue::observeDequeue(const QueuedM
     }
 }
 
-/** updates queue observers when a message has become unavailable for transfer,
- * expects messageLock to be held
+/** updates queue observers when a message has become unavailable for transfer
  */
-void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeAcquireLH(const QueuedMessage& msg)
 {
-    if (mgmtObject) {
-        mgmtObject->inc_acquires();
-        if (brokerMgmtObject)
-            brokerMgmtObject->inc_acquires();
-    }
-
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
             (*i)->acquired(msg);
@@ -1042,17 +1125,10 @@ void Queue::observeAcquire(const QueuedM
     }
 }
 
-/** updates queue observers when a message has become re-available for transfer,
- * expects messageLock to be held
+/** updates queue observers when a message has become re-available for transfer
  */
-void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+void Queue::observeRequeueLH(const QueuedMessage& msg)
 {
-    if (mgmtObject) {
-        mgmtObject->inc_releases();
-        if (brokerMgmtObject)
-            brokerMgmtObject->inc_releases();
-    }
-
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
             (*i)->requeued(msg);
@@ -1062,6 +1138,33 @@ void Queue::observeRequeue(const QueuedM
     }
 }
 
+/** updates queue observers when a new consumer has subscribed to this queue.
+ */
+void Queue::observeConsumerAddLH( const Consumer& c)
+{
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->consumerAdded(c);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+        }
+    }
+}
+
+/** updates queue observers when a consumer has unsubscribed from this queue.
+ */
+void Queue::observeConsumerRemoveLH( const Consumer& c)
+{
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->consumerRemoved(c);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+        }
+    }
+}
+
+
 void Queue::create(const FieldTable& _settings)
 {
     settings = _settings;
@@ -1209,23 +1312,21 @@ void Queue::configureImpl(const FieldTab
 void Queue::destroyed()
 {
     unbind(broker->getExchanges());
-    {
-        Mutex::ScopedLock locker(messageLock);
-        QueuedMessage m;
-        while(popAndDequeue(m, locker)) {
-            DeliverableMessage msg(m.payload);
-            if (alternateExchange.get()) {
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_abandonedViaAlt();
-                alternateExchange->routeWithAlternate(msg);
-            } else {
-                if (brokerMgmtObject)
-                    brokerMgmtObject->inc_abandoned();
-            }
+
+    QueuedMessage m;
+    while(popAndDequeue(m)) {
+        DeliverableMessage msg(m.payload);
+        if (alternateExchange.get()) {
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_abandonedViaAlt();
+            alternateExchange->routeWithAlternate(msg);
+        } else {
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_abandoned();
         }
-        if (alternateExchange.get())
-            alternateExchange->decAlternateUsers();
     }
+    if (alternateExchange.get())
+        alternateExchange->decAlternateUsers();
 
     if (store) {
         barrier.destroy();
@@ -1236,7 +1337,7 @@ void Queue::destroyed()
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
     notifyDeleted();
     {
-        Mutex::ScopedLock locker(messageLock);
+        Mutex::ScopedLock lock(messageLock);
         observers.clear();
     }
 }
@@ -1246,8 +1347,8 @@ void Queue::notifyDeleted()
     QueueListeners::ListenerSet set;
     {
         Mutex::ScopedLock locker(messageLock);
-        listeners.snapshot(set);
         deleted = true;
+        listeners.snapshot(set);
     }
     set.notifyAll();
 }
@@ -1265,6 +1366,7 @@ void Queue::unbind(ExchangeRegistry& exc
 
 void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
 {
+    Mutex::ScopedLock locker(messageLock);
     policy = _policy;
     if (policy.get())
         policy->setQueue(this);
@@ -1272,6 +1374,7 @@ void Queue::setPolicy(std::auto_ptr<Queu
 
 const QueuePolicy* Queue::getPolicy()
 {
+    Mutex::ScopedLock locker(messageLock);
     return policy.get();
 }
 
@@ -1553,8 +1656,12 @@ void Queue::recoveryComplete(ExchangeReg
                       << "\": exchange does not exist.");
     }
     //process any pending dequeues
-    for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
-    pendingDequeues.clear();
+    std::deque<QueuedMessage> pd;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        pendingDequeues.swap(pd);
+    }
+    for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
 }
 
 void Queue::insertSequenceNumbers(const std::string& key)
@@ -1564,10 +1671,9 @@ void Queue::insertSequenceNumbers(const 
     QPID_LOG(debug, "Inserting sequence numbers as " << key);
 }
 
-/** updates queue observers and state when a message has become available for transfer,
- * expects messageLock to be held
+/** updates queue observers and state when a message has become available for transfer
  */
-void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
+void Queue::observeEnqueueLH(const QueuedMessage& m)
 {
     for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
         try {
@@ -1576,10 +1682,6 @@ void Queue::observeEnqueue(const QueuedM
             QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
         }
     }
-    if (policy.get()) {
-        policy->enqueued(m);
-    }
-    mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
 }
 
 void Queue::updateEnqueued(const QueuedMessage& m)
@@ -1587,12 +1689,16 @@ void Queue::updateEnqueued(const QueuedM
     if (m.payload) {
         boost::intrusive_ptr<Message> payload = m.payload;
         enqueue(0, payload, true);
-        messages->updateAcquired(m);
-        if (policy.get()) {
-            policy->recoverEnqueued(payload);
+        {
+            Mutex::ScopedLock locker(messageLock);
+            messages->updateAcquired(m);
+            observeEnqueueLH(m);
+            if (policy.get()) {
+                policy->recoverEnqueued(payload);
+                policy->enqueued(m);
+            }
         }
-        Mutex::ScopedLock locker(messageLock);
-        observeEnqueue(m, locker);
+        mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
     } else {
         QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
     }
@@ -1600,10 +1706,16 @@ void Queue::updateEnqueued(const QueuedM
 
 bool Queue::isEnqueued(const QueuedMessage& msg)
 {
+    Mutex::ScopedLock locker(messageLock);
     return !policy.get() || policy->isEnqueued(msg);
 }
 
+// Note: accessing listeners outside of lock is dangerous.  Caller must ensure the queue's
+// state is not changed while listeners is referenced.
 QueueListeners& Queue::getListeners() { return listeners; }
+
+// Note: accessing messages outside of lock is dangerous.  Caller must ensure the queue's
+// state is not changed while messages is referenced.
 Messages& Queue::getMessages() { return *messages; }
 const Messages& Queue::getMessages() const { return *messages; }
 
@@ -1616,13 +1728,13 @@ void Queue::checkNotDeleted(const Consum
 
 void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
 {
-    Mutex::ScopedLock locker(messageLock);
+    Mutex::ScopedLock lock(messageLock);
     observers.insert(observer);
 }
 
 void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
 {
-    Mutex::ScopedLock locker(messageLock);
+    Mutex::ScopedLock lock(messageLock);
     observers.erase(observer);
 }
 
@@ -1685,7 +1797,7 @@ Queue::UsageBarrier::UsageBarrier(Queue&
 
 bool Queue::UsageBarrier::acquire()
 {
-    Monitor::ScopedLock l(parent.messageLock);
+    Monitor::ScopedLock l(parent.messageLock);  /** @todo: use a dedicated lock instead of messageLock */
     if (parent.deleted) {
         return false;
     } else {

Modified: qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h?rev=1300327&r1=1300326&r2=1300327&view=diff
==============================================================================
--- qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h Tue Mar 13 20:08:33 2012
@@ -107,7 +107,6 @@ class Queue : public boost::enable_share
     QueueListeners listeners;
     std::auto_ptr<Messages> messages;
     std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
-    mutable qpid::sys::Mutex consumerLock;
     mutable qpid::sys::Monitor messageLock;
     mutable qpid::sys::Mutex ownershipLock;
     mutable uint64_t persistenceId;
@@ -143,17 +142,17 @@ class Queue : public boost::enable_share
 
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
-    /** update queue observers, stats, policy, etc when the messages' state changes. Lock
-     * must be held by caller */
-    void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-    bool popAndDequeue(QueuedMessage&, const sys::Mutex::ScopedLock& lock);
-    // acquire message @ position, return true and set msg if acquire succeeds
-    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
-                 const sys::Mutex::ScopedLock& held);
+    /** update queue observers, stats, policy, etc when the messages' state changes.
+     * messageLock is held by caller */
+    void observeEnqueueLH(const QueuedMessage& msg);
+    void observeAcquireLH(const QueuedMessage& msg);
+    void observeRequeueLH(const QueuedMessage& msg);
+    void observeDequeueLH(const QueuedMessage& msg);
+    void observeConsumerAddLH( const Consumer& );
+    void observeConsumerRemoveLH( const Consumer& );
 
+    bool popAndDequeue(QueuedMessage&);
+    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
     void forcePersistent(QueuedMessage& msg);
     int getEventMode();
     void configureImpl(const qpid::framing::FieldTable& settings);
@@ -355,6 +354,7 @@ class Queue : public boost::enable_share
 
     /** Apply f to each Observer on the queue */
     template <class F> void eachObserver(F f) {
+        sys::Mutex::ScopedLock l(messageLock);
         std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
     }
 

Modified: qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.cpp?rev=1300327&r1=1300326&r2=1300327&view=diff
==============================================================================
--- qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.cpp (original)
+++ qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.cpp Tue Mar 13 20:08:33 2012
@@ -79,10 +79,6 @@ void QueueListeners::NotificationSet::no
     std::for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify));
 }
 
-bool QueueListeners::contains(Consumer::shared_ptr c) const {
-    return c->inListeners;
-}
-
 void QueueListeners::ListenerSet::notifyAll()
 {
     std::for_each(listeners.begin(), listeners.end(), boost::mem_fn(&Consumer::notify));

Modified: qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.h?rev=1300327&r1=1300326&r2=1300327&view=diff
==============================================================================
--- qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.h (original)
+++ qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/QueueListeners.h Tue Mar 13 20:08:33 2012
@@ -30,7 +30,7 @@ namespace broker {
 /**
  * Track and notify components that wish to be notified of messages
  * that become available on a queue.
- * 
+ *
  * None of the methods defined here are protected by locking. However
  * the populate method allows a 'snapshot' to be taken of the
  * listeners to be notified. NotificationSet::notify() may then be
@@ -61,11 +61,10 @@ class QueueListeners
       friend class QueueListeners;
     };
 
-    void addListener(Consumer::shared_ptr);    
-    void removeListener(Consumer::shared_ptr);    
+    void addListener(Consumer::shared_ptr);
+    void removeListener(Consumer::shared_ptr);
     void populate(NotificationSet&);
     void snapshot(ListenerSet&);
-    bool contains(Consumer::shared_ptr c) const;
     void notifyAll();
 
     template <class F> void eachListener(F f) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org