You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2007/09/14 23:09:31 UTC

svn commit: r575792 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: BrokerQueue.cpp PersistableMessage.h

Author: cctrieloff
Date: Fri Sep 14 14:09:31 2007
New Revision: 575792

URL: http://svn.apache.org/viewvc?rev=575792&view=rev
Log:
convert dequeue to counter for async

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=575792&r1=575791&r2=575792&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Fri Sep 14 14:09:31 2007
@@ -323,6 +323,7 @@
 bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
 {
     if (msg->isPersistent() && store) {
+	msg->dequeueAsync(); //increment to async counter -- for message sent to more than one queue
         store->dequeue(ctxt, *msg.get(), *this);
 	return true;
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=575792&r1=575791&r2=575792&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Sep 14 14:09:31 2007
@@ -38,6 +38,7 @@
 class PersistableMessage : public Persistable
 {
     sys::Monitor asyncEnqueueLock;
+    sys::Monitor asyncDequeueLock;
 
     /**
      * Tracks the number of outstanding asynchronous enqueue
@@ -49,12 +50,13 @@
     int asyncEnqueueCounter;
 
     /**
-     * Needs to be set false on Message construction, then
-     * set once the dequeueis complete, it gets set
-     * For transient, once dequeued, for durable, once
-     * dequeue record has been stored.
+     * Tracks the number of outstanding asynchronous dequeue
+     * operations. When the message is dequeued asynchronously the
+     * count is incremented; when that dequeue completes it is
+     * decremented. Thus when it is 0, there are no outstanding
+     * dequeues.
      */
-    bool dequeueCompleted;
+    int asyncDequeueCounter;
 
 public:
     typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -66,7 +68,10 @@
 
     virtual ~PersistableMessage() {};
 
-    PersistableMessage(): asyncEnqueueCounter(0), dequeueCompleted(false) {}
+    PersistableMessage(): 
+    	asyncEnqueueCounter(0), 
+    	asyncDequeueCounter(0) 
+	{}
     
     inline void waitForEnqueueComplete() {
         sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
@@ -94,8 +99,32 @@
         asyncEnqueueCounter++; 
     }
 
-    inline bool isDequeueComplete() { return dequeueCompleted; }
-    inline void dequeueComplete() { dequeueCompleted = true; }
+    inline bool isDequeueComplete() { 
+        sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+        return asyncDequeueCounter == 0;
+    }
+    
+    inline void dequeueComplete() { 
+        sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+        if (asyncDequeueCounter > 0) {
+            if (--asyncDequeueCounter == 0) {
+                asyncDequeueLock.notify();
+            }
+        }
+    }
+
+    inline void waitForDequeueComplete() {
+        sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+        while (asyncDequeueCounter > 0) {
+            asyncDequeueLock.wait();
+        }
+    }
+
+    inline void dequeueAsync() { 
+        sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+        asyncDequeueCounter++; 
+    }
+
     
 };