You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/11/19 13:18:28 UTC

svn commit: r596277 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/Queue.cpp broker/Queue.h sys/Serializer.cpp

Author: gsim
Date: Mon Nov 19 04:18:26 2007
New Revision: 596277

URL: http://svn.apache.org/viewvc?rev=596277&view=rev
Log:
Fixes causing lost 'events' in queue dispatch


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=596277&r1=596276&r2=596277&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Nov 19 04:18:26 2007
@@ -46,8 +46,7 @@
              const ConnectionToken* const _owner,
              Manageable* parent) :
 
-    dispatching(false),
-	name(_name), 
+    name(_name), 
     autodelete(_autodelete),
     store(_store),
     owner(_owner), 
@@ -76,8 +75,7 @@
 {
     // signal SemanticHander to ack completed dequeues
     // then dispatch to ack...
-	if (!dispatching)
-        serializer.execute(dispatchCallback);
+  serializer.execute(dispatchCallback);
 }
 
 
@@ -102,8 +100,7 @@
             push(msg);
         }
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
-        if (!dispatching)
-		    serializer.execute(dispatchCallback);
+	serializer.execute(dispatchCallback);
     }
 }
 
@@ -130,8 +127,7 @@
     push(msg);
     if (mgmtObject != 0)
         mgmtObject->enqueue (msg->contentSize (), mask);
-    if (!dispatching)
-        serializer.execute(dispatchCallback);
+    serializer.execute(dispatchCallback);
    
 }
 
@@ -141,8 +137,7 @@
         msg.payload->enqueueComplete(); // mark the message as enqueued
         messages.push_front(msg);
     }
-    if (!dispatching)
-	    serializer.execute(dispatchCallback);
+    serializer.execute(dispatchCallback);
 }
 
 bool Queue::acquire(const QueuedMessage& msg) {
@@ -158,8 +153,7 @@
 
 void Queue::requestDispatch(Consumer::ptr c){
     if (!c || c->preAcquires()) {
-        if (!dispatching)
-		    serializer.execute(dispatchCallback);
+      serializer.execute(dispatchCallback);
     } else {
         DispatchFunctor f(*this, c);
         serializer.execute(f);
@@ -235,7 +229,6 @@
 
 void Queue::dispatch()
 {
-	dispatching = true;
      QueuedMessage msg(this);
      while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
          if (dispatch(msg)) {
@@ -249,7 +242,6 @@
          }        
      }
      serviceAllBrowsers();
-	 dispatching = false;
 }
 
 void Queue::serviceAllBrowsers()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=596277&r1=596276&r2=596277&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Nov 19 04:18:26 2007
@@ -75,7 +75,6 @@
                 void operator()();
             };
 
-			bool dispatching;                
             const string name;
             const bool autodelete;
             MessageStore* const store;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp?rev=596277&r1=596276&r2=596277&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp Mon Nov 19 04:18:26 2007
@@ -58,7 +58,7 @@
 
 void SerializerBase::wait() {
     Mutex::ScopedLock l(lock);
-    lock.wait();
+    if (state == IDLE) lock.wait();
 }
 
 void SerializerBase::run() {