You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/03/19 21:07:38 UTC

svn commit: r1302629 - in /qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h

Author: kgiusti
Date: Mon Mar 19 20:07:37 2012
New Revision: 1302629

URL: http://svn.apache.org/viewvc?rev=1302629&view=rev
Log:
QPID-3890: revert changes to observer methods interfaces

Modified:
    qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h

Modified: qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1302629&r1=1302628&r2=1302629&view=diff
==============================================================================
--- qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp Mon Mar 19 20:07:37 2012
@@ -301,7 +301,7 @@ void Queue::requeue(const QueuedMessage&
             {
                 Mutex::ScopedLock locker(messageLock);
                 messages->release(msg);
-                observeRequeueLH(msg);
+                observeRequeue(msg, locker);
                 listeners.populate(copy);
             }
 
@@ -424,7 +424,7 @@ Queue::ConsumeCode Queue::consumeNextMes
                     Mutex::ScopedLock locker(messageLock);
                     bool ok = allocator->allocate( c->getName(), msg );  // inform allocator
                     (void) ok; assert(ok);
-                    observeAcquireLH(msg);
+                    observeAcquire(msg, locker);
                 }
                 if (mgmtObject) {
                     mgmtObject->inc_acquires();
@@ -535,7 +535,7 @@ void Queue::consume(Consumer::shared_ptr
         if (autoDeleteTimeout && autoDeleteTask) {
             autoDeleteTask->cancel();
         }
-        observeConsumerAddLH(*c);
+        observeConsumerAdd(*c, locker);
     }
     if (mgmtObject != 0)
         mgmtObject->inc_consumerCount ();
@@ -547,7 +547,7 @@ void Queue::cancel(Consumer::shared_ptr 
         Mutex::ScopedLock locker(messageLock);
         consumerCount--;
         if(exclusive) exclusive = 0;
-        observeConsumerRemoveLH(*c);
+        observeConsumerRemove(*c, locker);
     }
     if (mgmtObject != 0)
         mgmtObject->dec_consumerCount ();
@@ -559,7 +559,7 @@ QueuedMessage Queue::get(){
     {
         Mutex::ScopedLock locker(messageLock);
         ok = messages->consume(msg);
-        if (ok) observeAcquireLH(msg);
+        if (ok) observeAcquire(msg, locker);
     }
 
     if (ok && mgmtObject) {
@@ -615,7 +615,7 @@ void Queue::purgeExpired(qpid::sys::Dura
                     // KAG: should be safe to retake lock after the removeIf, since
                     // no other thread can touch these messages after the removeIf() call
                     Mutex::ScopedLock locker(messageLock);
-                    observeAcquireLH(*i);
+                    observeAcquire(*i, locker);
                 }
                 dequeue( 0, *i );
             }
@@ -774,7 +774,7 @@ uint32_t Queue::purge(const uint32_t pur
                 // KAG: should be safe to retake lock after the removeIf, since
                 // no other thread can touch these messages after the removeIf call
                 Mutex::ScopedLock locker(messageLock);
-                observeAcquireLH(*qmsg);
+                observeAcquire(*qmsg, locker);
             }
             dequeue(0, *qmsg);
             QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
@@ -814,7 +814,7 @@ uint32_t Queue::move(const Queue::shared
              qmsg != c.matches.end(); ++qmsg) {
             {
                 Mutex::ScopedLock locker(messageLock);
-                observeAcquireLH(*qmsg);
+                observeAcquire(*qmsg, locker);
             }
             dequeue(0, *qmsg);
             // and move to destination Queue.
@@ -832,7 +832,7 @@ bool Queue::acquire(const qpid::framing:
     {
         Mutex::ScopedLock locker(messageLock);
         ok = messages->acquire(position, msg);
-        if (ok) observeAcquireLH(msg);
+        if (ok) observeAcquire(msg, locker);
     }
     if (ok) {
         if (mgmtObject) {
@@ -856,9 +856,9 @@ void Queue::push(boost::intrusive_ptr<Me
         qm.position = ++sequence;
         if (messages->push(qm, removed)) {
             dequeueRequired = true;
-            observeAcquireLH(removed);
+            observeAcquire(removed, locker);
         }
-        observeEnqueueLH(qm);
+        observeEnqueue(qm, locker);
         if (policy.get()) {
             policy->enqueued(qm);
         }
@@ -1029,7 +1029,7 @@ bool Queue::dequeue(TransactionContext* 
         if (!ctxt) {
             if (policy.get()) policy->dequeued(msg);
             messages->deleted(msg);
-            observeDequeueLH(msg);
+            observeDequeue(msg, locker);
         }
     }
 
@@ -1057,7 +1057,7 @@ void Queue::dequeueCommitted(const Queue
         Mutex::ScopedLock locker(messageLock);
         if (policy.get()) policy->dequeued(msg);
         messages->deleted(msg);
-        observeDequeueLH(msg);
+        observeDequeue(msg, locker);
     }
     mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
     if (mgmtObject != 0) {
@@ -1085,7 +1085,7 @@ bool Queue::popAndDequeue(QueuedMessage&
     {
         Mutex::ScopedLock locker(messageLock);
         popped = messages->consume(msg);
-        if (popped) observeAcquireLH(msg);
+        if (popped) observeAcquire(msg, locker);
     }
     if (popped) {
         if (mgmtObject) {
@@ -1102,8 +1102,9 @@ bool Queue::popAndDequeue(QueuedMessage&
 
 /**
  * Updates policy and management when a message has been dequeued,
+ * Requires messageLock be held by caller.
  */
-void Queue::observeDequeueLH(const QueuedMessage& msg)
+void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
 {
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
@@ -1114,9 +1115,10 @@ void Queue::observeDequeueLH(const Queue
     }
 }
 
-/** updates queue observers when a message has become unavailable for transfer
+/** updates queue observers when a message has become unavailable for transfer.
+ * Requires messageLock be held by caller.
  */
-void Queue::observeAcquireLH(const QueuedMessage& msg)
+void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
 {
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
@@ -1128,8 +1130,9 @@ void Queue::observeAcquireLH(const Queue
 }
 
 /** updates queue observers when a message has become re-available for transfer
+ *  Requires messageLock be held by caller.
  */
-void Queue::observeRequeueLH(const QueuedMessage& msg)
+void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
 {
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
@@ -1142,7 +1145,7 @@ void Queue::observeRequeueLH(const Queue
 
 /** updates queue observers when a new consumer has subscribed to this queue.
  */
-void Queue::observeConsumerAddLH( const Consumer& c)
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
 {
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
@@ -1155,7 +1158,7 @@ void Queue::observeConsumerAddLH( const 
 
 /** updates queue observers when a consumer has unsubscribed from this queue.
  */
-void Queue::observeConsumerRemoveLH( const Consumer& c)
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
 {
     for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
         try{
@@ -1674,8 +1677,9 @@ void Queue::insertSequenceNumbers(const 
 }
 
 /** updates queue observers and state when a message has become available for transfer
+ *  Requires messageLock be held by caller.
  */
-void Queue::observeEnqueueLH(const QueuedMessage& m)
+void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
 {
     for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
         try {
@@ -1694,7 +1698,7 @@ void Queue::updateEnqueued(const QueuedM
         {
             Mutex::ScopedLock locker(messageLock);
             messages->updateAcquired(m);
-            observeEnqueueLH(m);
+            observeEnqueue(m, locker);
             if (policy.get()) {
                 policy->recoverEnqueued(payload);
                 policy->enqueued(m);

Modified: qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h?rev=1302629&r1=1302628&r2=1302629&view=diff
==============================================================================
--- qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h Mon Mar 19 20:07:37 2012
@@ -144,12 +144,12 @@ class Queue : public boost::enable_share
 
     /** update queue observers, stats, policy, etc when the messages' state changes.
      * messageLock is held by caller */
-    void observeEnqueueLH(const QueuedMessage& msg);
-    void observeAcquireLH(const QueuedMessage& msg);
-    void observeRequeueLH(const QueuedMessage& msg);
-    void observeDequeueLH(const QueuedMessage& msg);
-    void observeConsumerAddLH( const Consumer& );
-    void observeConsumerRemoveLH( const Consumer& );
+    void observeEnqueue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+    void observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+    void observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+    void observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+    void observeConsumerAdd( const Consumer&, const qpid::sys::Mutex::ScopedLock&);
+    void observeConsumerRemove( const Consumer&, const qpid::sys::Mutex::ScopedLock&);
 
     bool popAndDequeue(QueuedMessage&);
     bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org