You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/22 19:49:25 UTC

svn commit: r1174282 - in /qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker: MessageAllocator.cpp MessageGroupManager.cpp Queue.cpp Queue.h QueueObserver.h

Author: kgiusti
Date: Thu Sep 22 17:49:25 2011
New Revision: 1174282

URL: http://svn.apache.org/viewvc?rev=1174282&view=rev
Log:
QPID-3346: incorporate feedback from aconway's review

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp Thu Sep 22 17:49:25 2011
@@ -52,6 +52,7 @@ bool MessageAllocator::acquirable( const
                                    const QueuedMessage&,
                                    const qpid::sys::Mutex::ScopedLock&)
 {
+    // by default, all messages present on the queue are acquireable
     return true;
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Thu Sep 22 17:49:25 2011
@@ -28,13 +28,13 @@
 using namespace qpid::broker;
 
 namespace {
-    const std::string GroupQueryKey("qpid.message_group_queue");
-    const std::string GroupHeaderKey("group_header_key");
-    const std::string GroupStateKey("group_state");
-    const std::string GroupIdKey("group_id");
-    const std::string GroupMsgCount("msg_count");
-    const std::string GroupTimestamp("timestamp");
-    const std::string GroupConsumer("consumer");
+    const std::string GROUP_QUERY_KEY("qpid.message_group_queue");
+    const std::string GROUP_HEADER_KEY("group_header_key");
+    const std::string GROUP_STATE_KEY("group_state");
+    const std::string GROUP_ID_KEY("group_id");
+    const std::string GROUP_MSG_COUNT("msg_count");
+    const std::string GROUP_TIMESTAMP("timestamp");
+    const std::string GROUP_CONSUMER("consumer");
 }
 
 
@@ -66,12 +66,8 @@ void MessageGroupManager::enqueued( cons
     if (total == 1) {
         // newly created group, no owner
         state.group = group;
-#ifdef NDEBUG
+        assert(freeGroups.find(qm.position) == freeGroups.end());
         freeGroups[qm.position] = &state;
-#else
-        bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second;
-        (void) unique; assert(unique);
-#endif
     }
 }
 
@@ -261,22 +257,22 @@ void MessageGroupManager::query(qpid::ty
         }
     **/
 
-    assert(status.find(GroupQueryKey) == status.end());
+    assert(status.find(GROUP_QUERY_KEY) == status.end());
     qpid::types::Variant::Map state;
     qpid::types::Variant::List groups;
 
-    state[GroupHeaderKey] = groupIdHeader;
+    state[GROUP_HEADER_KEY] = groupIdHeader;
     for (GroupMap::const_iterator g = messageGroups.begin();
          g != messageGroups.end(); ++g) {
         qpid::types::Variant::Map info;
-        info[GroupIdKey] = g->first;
-        info[GroupMsgCount] = g->second.members.size();
-        info[GroupTimestamp] = 0;   /** @todo KAG - NEED HEAD MSG TIMESTAMP */
-        info[GroupConsumer] = g->second.owner;
+        info[GROUP_ID_KEY] = g->first;
+        info[GROUP_MSG_COUNT] = g->second.members.size();
+        info[GROUP_TIMESTAMP] = 0;   /** @todo KAG - NEED HEAD MSG TIMESTAMP */
+        info[GROUP_CONSUMER] = g->second.owner;
         groups.push_back(info);
     }
-    state[GroupStateKey] = groups;
-    status[GroupQueryKey] = state;
+    state[GROUP_STATE_KEY] = groups;
+    status[GROUP_QUERY_KEY] = state;
 }
 
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Thu Sep 22 17:49:25 2011
@@ -224,14 +224,7 @@ void Queue::requeue(const QueuedMessage&
             	enqueue(0, payload);
             }
         }
-
-        for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
-            try{
-                (*i)->requeued(msg);
-            } catch (const std::exception& e) {
-                QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
-            }
-        }
+        observeRequeue(msg, locker);
     }
     copy.notify();
 }
@@ -241,7 +234,7 @@ bool Queue::acquireMessageAt(const Seque
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    if (acquire(position, message )) {
+    if (acquire(position, message, locker)) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         return true;
     } else {
@@ -262,7 +255,7 @@ bool Queue::acquire(const QueuedMessage&
     }
 
     QueuedMessage copy(msg);
-    if (acquire( msg.position, copy )) {
+    if (acquire( msg.position, copy, locker)) {
         QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
         return true;
     }
@@ -317,7 +310,7 @@ Queue::ConsumeCode Queue::consumeNextMes
         if (msg.payload->hasExpired()) {
             QPID_LOG(debug, "Message expired from queue '" << name << "'");
             c->position = msg.position;
-            acquire( msg.position, msg );
+            acquire( msg.position, msg, locker);
             dequeue( 0, msg );
             continue;
         }
@@ -328,7 +321,7 @@ Queue::ConsumeCode Queue::consumeNextMes
             if (c->accept(msg.payload)) {
                 bool ok = allocator->acquirable( c->getName(), msg, locker );  // inform allocator
                 (void) ok; assert(ok);
-                ok = acquire( msg.position, msg );
+                ok = acquire( msg.position, msg, locker);
                 (void) ok; assert(ok);
                 m = msg;
                 c->position = m.position;
@@ -435,9 +428,9 @@ void Queue::consume(Consumer::shared_ptr
             autoDeleteTask->cancel();
         }
     }
+    Mutex::ScopedLock locker(messageLock);
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
-            Mutex::ScopedLock locker(messageLock);
             (*i)->consumerAdded(*c);
         } catch (const std::exception& e) {
             QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
@@ -454,9 +447,9 @@ void Queue::cancel(Consumer::shared_ptr 
         if (mgmtObject != 0)
             mgmtObject->dec_consumerCount ();
     }
+    Mutex::ScopedLock locker(messageLock);
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
-            Mutex::ScopedLock locker(messageLock);
             (*i)->consumerRemoved(*c);
         } catch (const std::exception& e) {
             QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
@@ -468,7 +461,7 @@ QueuedMessage Queue::get(){
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
     if (messages->pop(msg))
-        acquired( msg );
+        observeAcquire(msg, locker);
     return msg;
 }
 
@@ -504,7 +497,7 @@ void Queue::purgeExpired(qpid::sys::Dura
              i != expired.end(); ++i) {
             {
                 Mutex::ScopedLock locker(messageLock);
-                acquired( *i );   // expects messageLock held
+                observeAcquire(*i, locker);
             }
             dequeue( 0, *i );
         }
@@ -637,7 +630,7 @@ uint32_t Queue::purge(const uint32_t pur
     for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
          qmsg != c.matches.end(); ++qmsg) {
         // Update observers and message state:
-        acquired(*qmsg);
+        observeAcquire(*qmsg, locker);
         dequeue(0, *qmsg);
         // now reroute if necessary
         if (dest.get()) {
@@ -661,7 +654,7 @@ uint32_t Queue::move(const Queue::shared
     for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
          qmsg != c.matches.end(); ++qmsg) {
         // Update observers and message state:
-        acquired(*qmsg);
+        observeAcquire(*qmsg, locker);
         dequeue(0, *qmsg);
         // and move to destination Queue.
         assert(qmsg->payload);
@@ -673,21 +666,22 @@ uint32_t Queue::move(const Queue::shared
 /** Acquire the front (oldest) message from the in-memory queue.
  * assumes messageLock held by caller
  */
-void Queue::pop()
+void Queue::pop(const Mutex::ScopedLock& locker)
 {
     assertClusterSafe();
     QueuedMessage msg;
     if (messages->pop(msg)) {
-        acquired( msg ); // mark it removed
+        observeAcquire(msg, locker);
         ++dequeueSincePurge;
     }
 }
 
 /** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg )
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+                    const Mutex::ScopedLock& locker)
 {
     if (messages->remove(position, msg)) {
-        acquired( msg );
+        observeAcquire(msg, locker);
         ++dequeueSincePurge;
         return true;
     }
@@ -705,12 +699,13 @@ void Queue::push(boost::intrusive_ptr<Me
         if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
 
         dequeueRequired = messages->push(qm, removed);
+        if (dequeueRequired)
+            observeAcquire(removed, locker);
         listeners.populate(copy);
-        enqueued(qm);
+        observeEnqueue(qm, locker);
     }
     copy.notify();
     if (dequeueRequired) {
-        acquired( removed );  // tell observers
         if (isRecovery) {
             //can't issue new requests for the store until
             //recovery is complete
@@ -841,7 +836,7 @@ bool Queue::dequeue(TransactionContext* 
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return false;
         if (!ctxt) {
-            dequeued(msg);
+            observeDequeue(msg, locker);
         }
     }
     // This check prevents messages which have been forced persistent on one queue from dequeuing
@@ -861,7 +856,7 @@ bool Queue::dequeue(TransactionContext* 
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    dequeued(msg);
+    observeDequeue(msg, locker);
     if (mgmtObject != 0) {
         mgmtObject->inc_msgTxnDequeues();
         mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -872,11 +867,11 @@ void Queue::dequeueCommitted(const Queue
  * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
  * it from the logical (and persistent if applicable) queue
  */
-void Queue::popAndDequeue()
+void Queue::popAndDequeue(const Mutex::ScopedLock& held)
 {
     if (!messages->empty()) {
         QueuedMessage msg = messages->front();
-        pop();
+        pop(held);
         dequeue(0, msg);
     }
 }
@@ -885,7 +880,7 @@ void Queue::popAndDequeue()
  * Updates policy and management when a message has been dequeued,
  * expects messageLock to be held
  */
-void Queue::dequeued(const QueuedMessage& msg)
+void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
 {
     if (policy.get()) policy->dequeued(msg);
     mgntDeqStats(msg.payload);
@@ -901,7 +896,7 @@ void Queue::dequeued(const QueuedMessage
 /** updates queue observers when a message has become unavailable for transfer,
  * expects messageLock to be held
  */
-void Queue::acquired(const QueuedMessage& msg)
+void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
 {
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
@@ -912,6 +907,20 @@ void Queue::acquired(const QueuedMessage
     }
 }
 
+/** updates queue observers when a message has become re-available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+{
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->requeued(msg);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
+        }
+    }
+}
+
 void Queue::create(const FieldTable& _settings)
 {
     settings = _settings;
@@ -1034,7 +1043,7 @@ void Queue::destroyed()
         while(!messages->empty()){
             DeliverableMessage msg(messages->front().payload);
             alternateExchange->routeWithAlternate(msg);
-            popAndDequeue();
+            popAndDequeue(locker);
         }
         alternateExchange->decAlternateUsers();
     }
@@ -1328,7 +1337,10 @@ void Queue::insertSequenceNumbers(const 
     QPID_LOG(debug, "Inserting sequence numbers as " << key);
 }
 
-void Queue::enqueued(const QueuedMessage& m)
+/** updates queue observers and state when a message has become available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
 {
     for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
         try {
@@ -1351,7 +1363,8 @@ void Queue::updateEnqueued(const QueuedM
         if (policy.get()) {
             policy->recoverEnqueued(payload);
         }
-        enqueued(m);
+        Mutex::ScopedLock locker(messageLock);
+        observeEnqueue(m, locker);
     } else {
         QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
     }
@@ -1375,6 +1388,7 @@ void Queue::checkNotDeleted()
 
 void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
 {
+    Mutex::ScopedLock locker(messageLock);
     observers.insert(observer);
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h Thu Sep 22 17:49:25 2011
@@ -142,16 +142,19 @@ class Queue : public boost::enable_share
 
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
-    /** update queue observers with new message state */
-    void enqueued(const QueuedMessage& msg);
-    void acquired(const QueuedMessage& msg);
-    void dequeued(const QueuedMessage& msg);
+    /** update queue observers, stats, policy, etc when the messages' state changes. Lock
+     * must be held by caller */
+    void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+    void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+    void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+    void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
 
     /** modify the Queue's message container - assumes messageLock held */
-    void pop();             // acquire front msg
-    void popAndDequeue();   // acquire and dequeue front msg
+    void pop(const sys::Mutex::ScopedLock& held);           // acquire front msg
+    void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
     // acquire message @ position, return true and set msg if acquire succeeds
-    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg );
+    bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+                 const sys::Mutex::ScopedLock& held);
 
     void forcePersistent(QueuedMessage& msg);
     int getEventMode();

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h Thu Sep 22 17:49:25 2011
@@ -35,9 +35,11 @@ class Consumer;
  * the queue it has been delivered to.  A message can be considered in one of three states
  * with respect to the queue:
  *
- * 1) "Available" - available for transfer to consumers,
- * 2) "Locked" - to a particular consumer, no longer available for transfer, but not
- * considered fully dequeued.
+ * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire),
+ *
+ * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers
+ * (by either browse or acquire), but still considered on the queue.
+ *
  * 3) "Dequeued" - removed from the queue and no longer available to any consumer.
  *
  * The queue events that are observable are:
@@ -45,15 +47,15 @@ class Consumer;
  * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
  * (e.g. browse or acquire)
  *
- * "Acquired" - the message is "Locked" - a consumer has claimed exclusive access to it.
- * It is no longer available for other consumers to browse or acquire, but it is not yet
- * considered dequeued as it may be requeued by the consumer.
+ * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available
+ * for other consumers to browse or acquire, but it is not yet considered dequeued as it
+ * may be requeued by the consumer.
  *
- * "Requeued" - a previously-consumed message is 'unlocked': it is put back on the queue
- * at its original position and returns to the "Available" state.
+ * "Requeued" - a previously-acquired message is released by its owner: it is put back on
+ * the queue at its original position and returns to the "Available" state.
  *
- * "Dequeued" - a Locked message is no longer queued.  At this point, the queue no longer
- * tracks the message, and the broker considers the consumer's transaction complete.
+ * "Dequeued" - a message is no longer queued.  At this point, the queue no longer tracks
+ * the message, and the broker considers the consumer's transaction complete.
  */
 class QueueObserver
 {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org