You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/11/29 12:54:22 UTC

svn commit: r599395 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/client/ qpid/sys/ qpid/sys/posix/ tests/

Author: gsim
Date: Thu Nov 29 03:54:17 2007
New Revision: 599395

URL: http://svn.apache.org/viewvc?rev=599395&view=rev
Log:
Changes to threading: queues serialiser removed, io threads used to drive dispatch to consumers
Fix to PersistableMessage: use correct lock when accessing synclist, don't hold enqueue lock when notifying queues


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputTask.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Nov 29 03:54:17 2007
@@ -132,6 +132,7 @@
   qpid/Exception.cpp \
   qpid/Plugin.cpp \
   qpid/Url.cpp \
+  qpid/sys/AggregateOutput.cpp \
   qpid/sys/AsynchIOAcceptor.cpp \
   qpid/sys/Dispatcher.cpp \
   qpid/sys/Runnable.cpp \
@@ -408,6 +409,8 @@
   qpid/sys/Module.h \
   qpid/sys/Monitor.h \
   qpid/sys/Mutex.h \
+  qpid/sys/OutputControl.h \
+  qpid/sys/OutputTask.h \
   qpid/sys/Poller.h \
   qpid/sys/Runnable.h \
   qpid/sys/RefCountedMap.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Thu Nov 29 03:54:17 2007
@@ -302,9 +302,6 @@
 
     if(!nowait)
         getProxy().getBasic().consumeOk(newTag);
-
-    //allow messages to be dispatched if required as there is now a consumer:
-    queue->requestDispatch();
 } 
         
 void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Nov 29 03:54:17 2007
@@ -44,6 +44,7 @@
 
 Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
     broker(broker_),
+    outputTasks(*out_),
     out(out_),
     framemax(65536), 
     heartbeat(0),
@@ -94,6 +95,11 @@
                  e.what());
         assert(0);
     }
+}
+
+bool Connection::doOutput()
+{
+    return outputTasks.doOutput();
 }
 
 void Connection::closeChannel(uint16_t id) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Thu Nov 29 03:54:17 2007
@@ -29,6 +29,7 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/TimeoutHandler.h"
@@ -70,6 +71,9 @@
 
     Broker& broker;
     std::vector<Queue::shared_ptr> exclusiveQueues;
+    
+    //contained output tasks
+    sys::AggregateOutput outputTasks;
 
     // ConnectionInputHandler methods
     void received(framing::AMQFrame& frame);
@@ -77,6 +81,7 @@
     void idleOut();
     void idleIn();
     void closed();
+    bool doOutput();
 
     void closeChannel(framing::ChannelId channel);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Thu Nov 29 03:54:17 2007
@@ -53,7 +53,9 @@
             Consumer(bool preAcquires = true) : acquires(preAcquires) {}
             bool preAcquires() const { return acquires; }
             virtual bool deliver(QueuedMessage& msg) = 0;
+            virtual void notify() = 0;
             virtual bool filter(intrusive_ptr<Message>) { return true; }
+            virtual bool accept(intrusive_ptr<Message>) { return true; }
             virtual ~Consumer(){}
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Thu Nov 29 03:54:17 2007
@@ -139,8 +139,6 @@
     string tag = destination;
     state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
                     tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
-    // Dispatch messages as there is now a consumer.
-    queue->requestDispatch();
 }
 
 void

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Thu Nov 29 03:54:17 2007
@@ -28,11 +28,11 @@
 
 void PersistableMessage::flush()
 {
-	sys::ScopedLock<sys::Mutex> l(storeLock);
-	if (store) {
-         for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
-			  store->flush(*(*i));
-         } 
+    sys::ScopedLock<sys::Mutex> l(storeLock);
+    if (store) {
+        for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
+            store->flush(*(*i));
+        } 
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Thu Nov 29 03:54:17 2007
@@ -43,7 +43,7 @@
 {
     sys::Monitor asyncEnqueueLock;
     sys::Monitor asyncDequeueLock;
-	sys::Mutex storeLock;
+    sys::Mutex storeLock;
 	
     /**
      * Tracks the number of outstanding asynchronous enqueue
@@ -84,12 +84,12 @@
     	asyncEnqueueCounter(0), 
     	asyncDequeueCounter(0),
         store(0),
-		contentReleased(false) 
-	{}
+        contentReleased(false) 
+        {}
 
     void flush();
     
-	inline bool isContentReleased()const {return contentReleased; }
+    inline bool isContentReleased()const {return contentReleased; }
 	
     inline void waitForEnqueueComplete() {
         sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
@@ -104,27 +104,35 @@
     }
 
     inline void enqueueComplete() {
-        sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
-        if (asyncEnqueueCounter > 0) {
-            if (--asyncEnqueueCounter == 0) {
-                asyncEnqueueLock.notify();
-				if (store) {
-                    for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
-        	            (*i)->notifyDurableIOComplete();
-                    } 
+        bool notify = false;
+        {
+            sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+            if (asyncEnqueueCounter > 0) {
+                if (--asyncEnqueueCounter == 0) {
+                    asyncEnqueueLock.notify();
+                    notify = true;
                 }
             }
         }
+        if (notify) {
+            sys::ScopedLock<sys::Mutex> l(storeLock);
+            if (store) {
+                for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
+                    (*i)->notifyDurableIOComplete();
+                } 
+                synclist.clear();
+            }            
+        }
     }
 
     inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) { 
-		if (_store){
-			sys::ScopedLock<sys::Mutex> l(storeLock);
-		    store = _store;
-		    synclist.push_back(queue);
-		}
-	    enqueueAsync();
-	}
+        if (_store){
+            sys::ScopedLock<sys::Mutex> l(storeLock);
+            store = _store;
+            synclist.push_back(queue);
+        }
+        enqueueAsync();
+    }
 
     inline void enqueueAsync() { 
         sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
@@ -154,13 +162,13 @@
     }
 
     inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) { 
-		if (_store){
+        if (_store){
             sys::ScopedLock<sys::Mutex> l(storeLock);
             store = _store;
-			synclist.push_back(queue);
-		}
-	    dequeueAsync();
-	}
+            synclist.push_back(queue);
+        }
+        dequeueAsync();
+    }
 
     inline void dequeueAsync() { 
         sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Nov 29 03:54:17 2007
@@ -31,7 +31,8 @@
 #include <iostream>
 #include <boost/bind.hpp>
 #include "QueueRegistry.h"
-
+#include <algorithm>
+#include <functional>
 
 using namespace qpid::broker;
 using namespace qpid::sys;
@@ -40,6 +41,8 @@
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
+using std::for_each;
+using std::mem_fun;
 
 Queue::Queue(const string& _name, bool _autodelete, 
              MessageStore* const _store,
@@ -50,10 +53,9 @@
     autodelete(_autodelete),
     store(_store),
     owner(_owner), 
-    next(0),
-    persistenceId(0),
-    serializer(false),
-    dispatchCallback(*this)
+    consumerCount(0),
+    exclusive(false),
+    persistenceId(0)
 {
     if (parent != 0)
     {
@@ -73,9 +75,8 @@
 
 void Queue::notifyDurableIOComplete()
 {
-    // signal SemanticHander to ack completed dequeues
-    // then dispatch to ack...
-  serializer.execute(dispatchCallback);
+    Mutex::ScopedLock locker(messageLock);
+    notify();
 }
 
 
@@ -110,7 +111,6 @@
             push(msg);
         }
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
-	serializer.execute(dispatchCallback);
     }
 }
 
@@ -148,17 +148,13 @@
             mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
         }
     }
-    serializer.execute(dispatchCallback);
-   
 }
 
 void Queue::requeue(const QueuedMessage& msg){
-    {
-        Mutex::ScopedLock locker(messageLock);
-        msg.payload->enqueueComplete(); // mark the message as enqueued
-        messages.push_front(msg);
-    }
-    serializer.execute(dispatchCallback);
+    Mutex::ScopedLock locker(messageLock);
+    msg.payload->enqueueComplete(); // mark the message as enqueued
+    messages.push_front(msg);
+    notify();
 }
 
 bool Queue::acquire(const QueuedMessage& msg) {
@@ -172,186 +168,170 @@
     return false;
 }
 
-void Queue::requestDispatch(Consumer::ptr c){
-    if (!c || c->preAcquires()) {
-      serializer.execute(dispatchCallback);
-    } else {
-        DispatchFunctor f(*this, c);
-        serializer.execute(f);
-    }
-}
-
-void Queue::flush(DispatchCompletion& completion)
-{
-    DispatchFunctor f(*this, &completion);
-    serializer.execute(f);
-}
-
 /**
  * Return true if the message can be excluded. This is currently the
- * case if the queue has an exclusive consumer that will never want
- * the message, or if the queue is exclusive to a single connection
- * and has a single consumer (covers the JMS topic case).
+ * case if the queue is exclusive and has an exclusive consumer that
+ * doesn't want the message or has a single consumer that doesn't want
+ * the message (covers the JMS topic case).
  */
-bool Queue::exclude(intrusive_ptr<Message> msg)
+bool Queue::canExcludeUnwanted()
+{
+    Mutex::ScopedLock locker(consumerLock);
+    return hasExclusiveOwner() && (exclusive || consumerCount == 1);
+}
+
+
+bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
 {
-    RWlock::ScopedWlock locker(consumerLock);
-    if (exclusive) {
-        return !exclusive->filter(msg);
-    } else if (hasExclusiveOwner() && acquirers.size() == 1) {
-        return !acquirers[0]->filter(msg);
+    if (c.preAcquires()) {
+        return consumeNextMessage(m, c);
     } else {
-        return false;
+        return browseNextMessage(m, c);
     }
 }
 
-Consumer::ptr Queue::allocate()
+bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
 {
-    RWlock::ScopedWlock locker(consumerLock);
- 
-    if (acquirers.empty()) {
-        return Consumer::ptr();
-    } else if (exclusive){
-        return exclusive;
-    } else {
-        next = next % acquirers.size();
-        return acquirers[next++];
+    while (true) {
+        Mutex::ScopedLock locker(messageLock);
+        if (messages.empty()) { 
+            QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+            addListener(c);
+            return false;
+        } else {
+            QueuedMessage msg = messages.front();
+            if (!msg.payload->isEnqueueComplete()) { 
+                QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'");
+                addListener(c);
+                return false;
+            }
+            
+            if (c.filter(msg.payload)) {
+                if (c.accept(msg.payload)) {            
+                    m = msg;
+                    pop();
+                    return true;
+                } else {
+                    //message(s) are available but consumer hasn't got enough credit
+                    QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+                    return false;
+                }
+            } else {
+                //consumer will never want this message
+                if (canExcludeUnwanted()) {
+                    //hack for no-local on JMS topics; get rid of this message
+                    QPID_LOG(debug, "Excluding message from '" << name << "'");
+                    pop();
+                } else {
+                    //leave it for another consumer
+                    QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+                    return false;
+                }
+            } 
+        }
     }
 }
 
-bool Queue::dispatch(QueuedMessage& msg)
+
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
 {
-    QPID_LOG(info, "Dispatch message " << msg.position << " from queue " << name);
-    //additions to the acquirers will result in a separate dispatch
-    //request, so won't result in anyone being missed
-    uint counter = getAcquirerCount();
-    Consumer::ptr c = allocate();
-    while (c && counter--){
-        if (c->deliver(msg)) {
-            return true;
+    QueuedMessage msg(this);
+    while (seek(msg, c)) {
+        if (c.filter(msg.payload)) {
+            if (c.accept(msg.payload)) {
+                //consumer wants the message
+                c.position = msg.position;
+                m = msg;
+                return true;
+            } else {
+                //consumer hasn't got enough credit for the message
+                QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+                return false;
+            }
         } else {
-            c = allocate();
+            //consumer will never want this message, continue seeking
+            c.position = msg.position;
+            QPID_LOG(debug, "Browser skipping message from '" << name << "'");
         }
     }
     return false;
 }
 
-bool Queue::getNextMessage(QueuedMessage& msg)
+void Queue::notify()
+{
+    //notify listeners that there may be messages to process
+    for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify));
+    listeners.clear();
+}
+
+void Queue::removeListener(Consumer& c)
 {
     Mutex::ScopedLock locker(messageLock);
-    if (messages.empty()) { 
-        QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
-        return false;
-    } else {
-        msg = messages.front();
-        return true;
-    }
+    listeners.erase(&c);
 }
 
-void Queue::dispatch()
+void Queue::addListener(Consumer& c)
 {
-     QueuedMessage msg(this);
-     while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
-         if (dispatch(msg)) {
-             pop();
-         } else if (exclude(msg.payload)) {
-             pop();
-             dequeue(0, msg.payload);
-             QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]");        
-         } else {            
-             break;
-         }        
-     }
-     serviceAllBrowsers();
-}
-
-void Queue::serviceAllBrowsers()
-{
-     Consumers copy;
-     {
-         RWlock::ScopedRlock locker(consumerLock);
-         if (browsers.empty()) return;//shortcut
-         copy = browsers;
-     }
-     for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) {
-         serviceBrowser(*i);
-     }
+    listeners.insert(&c);
 }
 
-void Queue::serviceBrowser(Consumer::ptr browser)
+bool Queue::dispatch(Consumer& c)
 {
     QueuedMessage msg(this);
-    while (seek(msg, browser->position) && browser->deliver(msg)) {
-        browser->position = msg.position;
+    if (getNextMessage(msg, c)) {
+        c.deliver(msg);
+        return true;
+    } else {
+        return false;
     }
 }
 
-bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
+bool Queue::seek(QueuedMessage& msg, Consumer& c) {
     Mutex::ScopedLock locker(messageLock);
-    if (!messages.empty() && messages.back().position > position) {
-        if (position < messages.front().position) {
+    if (!messages.empty() && messages.back().position > c.position) {
+        if (c.position < messages.front().position) {
             msg = messages.front();
             return true;
         } else {        
-            uint index = (position - messages.front().position) + 1;
+            uint index = (c.position - messages.front().position) + 1;
             if (index < messages.size()) {
                 msg = messages[index];
                 return true;
             } 
         }
     }
+    addListener(c);
     return false;
 }
 
-void Queue::consume(Consumer::ptr c, bool requestExclusive){
-    RWlock::ScopedWlock locker(consumerLock);
+void Queue::consume(Consumer&, bool requestExclusive){
+    Mutex::ScopedLock locker(consumerLock);
     if(exclusive) {
         throw AccessRefusedException(
             QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
-    }
-    if(requestExclusive) {
-        if(acquirers.empty() && browsers.empty()) {
-            exclusive = c;
-        } else {
+    } else if(requestExclusive) {
+        if(consumerCount) {
             throw AccessRefusedException(
                 QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
-        }
-    }
-    if (c->preAcquires()) {
-        acquirers.push_back(c);
-    } else {
-        Mutex::ScopedLock locker(messageLock);
-        if (messages.empty()) {
-            c->position = SequenceNumber(sequence.getValue() - 1);
         } else {
-            c->position = SequenceNumber(messages.front().position.getValue() - 1);
+            exclusive = true;
         }
-        browsers.push_back(c);
     }
+    consumerCount++;
 
     if (mgmtObject != 0){
         mgmtObject->inc_consumers ();
     }
 }
 
-void Queue::cancel(Consumer::ptr c){
-    RWlock::ScopedWlock locker(consumerLock);
-    if (c->preAcquires()) {
-        cancel(c, acquirers);
-    } else {
-        cancel(c, browsers);
-    }
+void Queue::cancel(Consumer& c){
+    removeListener(c);
+    Mutex::ScopedLock locker(consumerLock);
+    consumerCount--;
+    if(exclusive) exclusive = false;
     if (mgmtObject != 0){
         mgmtObject->dec_consumers ();
     }
-    if(exclusive == c) exclusive.reset();
-}
-
-void Queue::cancel(Consumer::ptr c, Consumers& consumers)
-{
-    Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
-    if (i != consumers.end()) 
-        consumers.erase(i);
 }
 
 QueuedMessage Queue::dequeue(){
@@ -382,14 +362,16 @@
     return count;
 }
 
+/**
+ * Assumes messageLock is held
+ */
 void Queue::pop(){
-    Mutex::ScopedLock locker(messageLock);
     if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
     messages.pop_front();
 }
 
 void Queue::push(intrusive_ptr<Message>& msg){
-    Mutex::ScopedLock locker(messageLock);
+    Mutex::ScopedLock locker(messageLock);   
     messages.push_back(QueuedMessage(this, msg, ++sequence));
     if (policy.get()) {
         policy->enqueued(msg->contentSize());
@@ -397,6 +379,7 @@
             msg->releaseContent(store);
         }
     }
+    notify();
 }
 
 /** function only provided for unit tests, or code not in critical message path */
@@ -412,18 +395,13 @@
 }
 
 uint32_t Queue::getConsumerCount() const{
-    RWlock::ScopedRlock locker(consumerLock);
-    return acquirers.size() + browsers.size();
-}
-
-uint32_t Queue::getAcquirerCount() const{
-    RWlock::ScopedRlock locker(consumerLock);
-    return acquirers.size();
+    Mutex::ScopedLock locker(consumerLock);
+    return consumerCount;
 }
 
 bool Queue::canAutoDelete() const{
-    RWlock::ScopedRlock locker(consumerLock);
-    return autodelete && acquirers.empty() && browsers.empty();
+    Mutex::ScopedLock locker(consumerLock);
+    return autodelete && !consumerCount;
 }
 
 // return true if store exists, 
@@ -599,21 +577,6 @@
 bool Queue::hasExclusiveConsumer() const 
 { 
     return exclusive; 
-}
-
-void Queue::DispatchFunctor::operator()()
-{
-    try {
-        if (consumer && !consumer->preAcquires()) {
-            queue.serviceBrowser(consumer);                        
-        }else{
-            queue.dispatch(); 
-        }
-    } catch (const std::exception& e) {
-        QPID_LOG(error, "Exception on dispatch: " << e.what());
-    }
-    
-    if (sync) sync->completed();
 }
 
 ManagementObject::shared_ptr Queue::GetManagementObject (void) const

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Nov 29 03:54:17 2007
@@ -24,6 +24,7 @@
 #include <vector>
 #include <memory>
 #include <deque>
+#include <set>
 #include <boost/shared_ptr.hpp>
 #include "qpid/framing/amqp_types.h"
 #include "ConnectionToken.h"
@@ -48,12 +49,6 @@
 
         using std::string;
 
-        struct DispatchCompletion 
-        {
-            virtual ~DispatchCompletion() {}
-            virtual void completed() = 0;
-        };
-
         /**
          * The brokers representation of an amqp queue. Messages are
          * delivered to a queue from where they can be dispatched to
@@ -61,59 +56,40 @@
          * or more consumers registers.
          */
         class Queue : public PersistableQueue, public management::Manageable {
-            typedef std::vector<Consumer::ptr> Consumers;
+            typedef std::set<Consumer*> Listeners;
             typedef std::deque<QueuedMessage> Messages;
-            
-            struct DispatchFunctor 
-            {
-                Queue& queue;
-                Consumer::ptr consumer;
-                DispatchCompletion* sync;
-
-                DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {}
-                DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* s = 0) : queue(q), consumer(c), sync(s) {}
-                void operator()();
-            };
 
             const string name;
             const bool autodelete;
             MessageStore* const store;
             const ConnectionToken* owner;
-            Consumers acquirers;
-            Consumers browsers;
+            uint32_t consumerCount;
+            bool exclusive;
+            Listeners listeners;
             Messages messages;
-            int next;
-            mutable qpid::sys::RWlock consumerLock;
+            mutable qpid::sys::Mutex consumerLock;
             mutable qpid::sys::Mutex messageLock;
             mutable qpid::sys::Mutex ownershipLock;
-            Consumer::ptr exclusive;
             mutable uint64_t persistenceId;
             framing::FieldTable settings;
             std::auto_ptr<QueuePolicy> policy;            
             QueueBindings bindings;
             boost::shared_ptr<Exchange> alternateExchange;
-            qpid::sys::Serializer<DispatchFunctor> serializer;
-            DispatchFunctor dispatchCallback;
             framing::SequenceNumber sequence;
             management::Queue::shared_ptr mgmtObject;
 
             void pop();
             void push(intrusive_ptr<Message>& msg);
-            bool dispatch(QueuedMessage& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
-            /**
-             * only called by serilizer
-             */
-            void dispatch();
-            void cancel(Consumer::ptr c, Consumers& set);
-            void serviceAllBrowsers();
-            void serviceBrowser(Consumer::ptr c);
-            Consumer::ptr allocate();
-            bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
-            uint32_t getAcquirerCount() const;
-            bool getNextMessage(QueuedMessage& msg);
-            bool exclude(intrusive_ptr<Message> msg);
- 
+            bool seek(QueuedMessage& msg, Consumer& position);
+            bool getNextMessage(QueuedMessage& msg, Consumer& c);
+            bool consumeNextMessage(QueuedMessage& msg, Consumer& c);
+            bool browseNextMessage(QueuedMessage& msg, Consumer& c);
+            bool canExcludeUnwanted();
+
+            void notify();
+            void removeListener(Consumer&);
+            void addListener(Consumer&);
 
         public:
             virtual void notifyDurableIOComplete();
@@ -127,6 +103,8 @@
                   Manageable* parent = 0);
             ~Queue();
 
+            bool dispatch(Consumer&);
+
             void create(const qpid::framing::FieldTable& settings);
             void configure(const qpid::framing::FieldTable& settings);
             void destroy();
@@ -156,16 +134,10 @@
              * Used during recovery to add stored messages back to the queue
              */
             void recover(intrusive_ptr<Message>& msg);
-            /**
-             * Request dispatch any queued messages providing there are
-             * consumers for them. Only one thread can be dispatching
-             * at any time, so this call schedules the despatch based on
-             * the serilizer policy.
-             */
-            void requestDispatch(Consumer::ptr c = Consumer::ptr());
-            void flush(DispatchCompletion& callback);
-            void consume(Consumer::ptr c, bool exclusive = false);
-            void cancel(Consumer::ptr c);
+
+            void consume(Consumer& c, bool exclusive = false);
+            void cancel(Consumer& c);
+
             uint32_t purge();
             uint32_t getMessageCount() const;
             uint32_t getConsumerCount() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Thu Nov 29 03:54:17 2007
@@ -93,7 +93,6 @@
 {
     SequenceNumber mark = incoming.getMark();
     SequenceNumberSet range = incoming.getRange();
-    Mutex::ScopedLock l(outLock);
     session.getProxy().getExecution().complete(mark.getValue(), range);
 }
 
@@ -128,7 +127,6 @@
     if (!invoker.wasHandled()) {
         throw NotImplementedException("Not implemented");
     } else if (invoker.hasResult()) {
-        Mutex::ScopedLock l(outLock);
         session.getProxy().getExecution().result(id.getValue(), invoker.getResult());
     }
     if (method->isSync()) { 
@@ -166,7 +164,6 @@
 
 DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
 {
-    Mutex::ScopedLock l(outLock);
     SessionHandler* handler = session.getHandler();
     if (handler) {
         uint32_t maxFrameSize = handler->getConnection().getFrameMax();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Thu Nov 29 03:54:17 2007
@@ -61,7 +61,6 @@
     // state?
     IncomingExecutionContext incoming;
     framing::Window outgoing;
-    sys::Mutex outLock;
     MessageBuilder msgBuilder;
     RangedOperation ackOp;
 
@@ -93,6 +92,9 @@
     void noop();
     void result(uint32_t command, const std::string& data);
     void sync();
+
+
+    SemanticState& getSemanticState() { return state; }
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Nov 29 03:54:17 2007
@@ -62,7 +62,8 @@
       tagGenerator("sgen"),
       dtxSelected(false),
       accumulatedAck(0),
-      flowActive(true)
+      flowActive(true),
+      outputTasks(ss)
 {
     outstanding.reset();
 }
@@ -70,7 +71,7 @@
 SemanticState::~SemanticState() {
     //cancel all consumers
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
-        cancel(i->second);
+        cancel(*i);
     }
 
     if (dtxBuffer.get()) {
@@ -89,19 +90,19 @@
 {
     if(tagInOut.empty())
         tagInOut = tagGenerator.generate();
-    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
-    queue->consume(c, exclusive);//may throw exception
-    consumers[tagInOut] = c;
+    std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
+    queue->consume(*c, exclusive);//may throw exception
+    outputTasks.addOutputTask(c.get());
+    consumers.insert(tagInOut, c.release());
 }
 
 void SemanticState::cancel(const string& tag){
     ConsumerImplMap::iterator i = consumers.find(tag);
     if (i != consumers.end()) {
-        cancel(i->second);
+        cancel(*i);
         consumers.erase(i); 
         //should cancel all unacked messages for this consumer so that
         //they are not redelivered on recovery
-        Mutex::ScopedLock locker(deliveryLock);   
         for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag));
         
     }
@@ -232,7 +233,6 @@
 
 bool SemanticState::checkPrefetch(intrusive_ptr<Message>& msg)
 {
-    Mutex::ScopedLock locker(deliveryLock);
     bool countOk = !prefetchCount || prefetchCount > unacked.size();
     bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
     return countOk && sizeOk;
@@ -254,37 +254,27 @@
     ackExpected(ack), 
     nolocal(_nolocal),
     acquire(_acquire),
-    blocked(false), 
+    blocked(true), 
     windowing(true), 
     msgCredit(0), 
     byteCredit(0) {}
 
 bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
 {
-    if (!parent->getSession().isAttached()) {
-        return false;
-    }
-
-    if (nolocal &&
-        &parent->getSession().getConnection() == msg.payload->getPublisher()) {
-        return false;
-    } else {
-        if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
-            blocked = true;
-        } else {
-            blocked = false;
-            Mutex::ScopedLock locker(parent->deliveryLock);
-
-            DeliveryId deliveryTag =
-                parent->deliveryAdapter.deliver(msg, token);
-            if (windowing || ackExpected) {
-                parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
-            } 
-			if (acquire && !ackExpected) {
-                queue->dequeue(0, msg.payload);
-            }
+    if (parent->getSession().isAttached() && accept(msg.payload)) {
+        allocateCredit(msg.payload);
+        DeliveryId deliveryTag =
+            parent->deliveryAdapter.deliver(msg, token);
+        if (windowing || ackExpected) {
+            parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
+        } 
+        if (acquire && !ackExpected) {
+            queue->dequeue(0, msg.payload);
         }
-        return !blocked;
+        return true;
+    } else {
+        QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent);
+        return false;
     }
 }
 
@@ -294,35 +284,48 @@
              &parent->getSession().getConnection() == msg->getPublisher());
 }
 
+bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
+{
+    //TODO: remove the now redundant checks (channel.flow & basic|message.qos removed):
+    blocked = !(filter(msg) && checkCredit(msg) && parent->flowActive && (!ackExpected || parent->checkPrefetch(msg)));
+    return !blocked;
+}
+
+void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
+{
+    uint32_t originalMsgCredit = msgCredit;
+    uint32_t originalByteCredit = byteCredit;        
+    if (msgCredit != 0xFFFFFFFF) {
+        msgCredit--;
+    }
+    if (byteCredit != 0xFFFFFFFF) {
+        byteCredit -= msg->getRequiredCredit();
+    }
+    QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+             << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
+             << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+    
+}
+
 bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
 {
-    Mutex::ScopedLock l(lock);
     if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
         QPID_LOG(debug, "Not enough credit for '" << name  << "' on " << parent 
                  << ", bytes: " << byteCredit << " msgs: " << msgCredit);
         return false;
     } else {
-        uint32_t originalMsgCredit = msgCredit;
-        uint32_t originalByteCredit = byteCredit;        
-
-        if (msgCredit != 0xFFFFFFFF) {
-            msgCredit--;
-        }
-        if (byteCredit != 0xFFFFFFFF) {
-            byteCredit -= msg->getRequiredCredit();
-        }
         QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
-                 << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
-                 << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+                 << " bytes: " << byteCredit << " msgs: " << msgCredit);
         return true;
     }
 }
 
 SemanticState::ConsumerImpl::~ConsumerImpl() {}
 
-void SemanticState::cancel(ConsumerImpl::shared_ptr c)
+void SemanticState::cancel(ConsumerImpl& c)
 {
-    Queue::shared_ptr queue = c->getQueue();
+    outputTasks.removeOutputTask(&c);
+    Queue::shared_ptr queue = c.getQueue();
     if(queue) {
         queue->cancel(c);
         if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {            
@@ -374,8 +377,6 @@
 void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
 {
     {
-        Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-        
         ack_iterator start = cumulative ? unacked.begin() : 
             find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
         ack_iterator end = start;
@@ -417,14 +418,14 @@
 void SemanticState::requestDispatch()
 {    
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
-        requestDispatch(i->second);
+        requestDispatch(*i);
     }
 }
 
-void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c)
+void SemanticState::requestDispatch(ConsumerImpl& c)
 {    
-    if(c->isBlocked()) {
-        c->getQueue()->requestDispatch(c);
+    if(c.isBlocked()) {
+        c.doOutput();
     }
 }
 
@@ -433,14 +434,13 @@
     delivery.subtractFrom(outstanding);
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
-        i->second->acknowledged(delivery);
+        i->acknowledged(delivery);
     }
 }
 
 void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
 {
     if (windowing) {
-        Mutex::ScopedLock l(lock);
         if (msgCredit != 0xFFFFFFFF) msgCredit++;
         if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
     }
@@ -448,8 +448,6 @@
 
 void SemanticState::recover(bool requeue)
 {
-    Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
     if(requeue){
         outstanding.reset();
         //take copy and clear unacked as requeue may result in redelivery to this session
@@ -470,7 +468,6 @@
 {
     QueuedMessage msg = queue->dequeue();
     if(msg.payload){
-        Mutex::ScopedLock locker(deliveryLock);
         DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
@@ -483,13 +480,11 @@
 
 DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
 {
-    Mutex::ScopedLock locker(deliveryLock);
     return deliveryAdapter.deliver(msg, token);
 }
 
 void SemanticState::flow(bool active)
 {
-    Mutex::ScopedLock locker(deliveryLock);
     bool requestDelivery(!flowActive && active);
     flowActive = active;
     if (requestDelivery) {
@@ -499,50 +494,50 @@
 }
 
 
-SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination)
+SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
 {
     ConsumerImplMap::iterator i = consumers.find(destination);
     if (i == consumers.end()) {
         throw NotFoundException(QPID_MSG("Unknown destination " << destination));
     } else {
-        return i->second;
+        return *i;
     }
 }
 
 void SemanticState::setWindowMode(const std::string& destination)
 {
-    find(destination)->setWindowMode();
+    find(destination).setWindowMode();
 }
 
 void SemanticState::setCreditMode(const std::string& destination)
 {
-    find(destination)->setCreditMode();
+    find(destination).setCreditMode();
 }
 
 void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
 {
-    ConsumerImpl::shared_ptr c = find(destination);
-    c->addByteCredit(value);
+    ConsumerImpl& c = find(destination);
+    c.addByteCredit(value);
     requestDispatch(c);
 }
 
 
 void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
 {
-    ConsumerImpl::shared_ptr c = find(destination);
-    c->addMessageCredit(value);
+    ConsumerImpl& c = find(destination);
+    c.addMessageCredit(value);
     requestDispatch(c);
 }
 
 void SemanticState::flush(const std::string& destination)
 {
-    find(destination)->flush();
+    find(destination).flush();
 }
 
 
 void SemanticState::stop(const std::string& destination)
 {
-    find(destination)->stop();
+    find(destination).stop();
 }
 
 void SemanticState::ConsumerImpl::setWindowMode()
@@ -557,7 +552,6 @@
 
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
-    Mutex::ScopedLock l(lock);
     if (byteCredit != 0xFFFFFFFF) {
         byteCredit += value;
     }
@@ -565,7 +559,6 @@
 
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
-    Mutex::ScopedLock l(lock);
     if (msgCredit != 0xFFFFFFFF) {
         msgCredit += value;
     }
@@ -573,16 +566,12 @@
 
 void SemanticState::ConsumerImpl::flush()
 {
-    //need to prevent delivery after requestDispatch returns but
-    //before credit is reduced to zero
-    FlushCompletion completion(*this);
-    queue->flush(completion);
-    completion.wait();
+    while(queue->dispatch(*this));
+    stop();
 }
 
 void SemanticState::ConsumerImpl::stop()
 {
-    Mutex::ScopedLock l(lock);
     msgCredit = 0;
     byteCredit = 0;
 }
@@ -618,14 +607,12 @@
 
 void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
 {
-    Mutex::ScopedLock locker(deliveryLock);
     AckRange range = findRange(first, last);
     for_each(range.start, range.end, AcquireFunctor(acquired));
 }
 
 void SemanticState::release(DeliveryId first, DeliveryId last)
 {
-    Mutex::ScopedLock locker(deliveryLock);
     AckRange range = findRange(first, last);
     //release results in the message being added to the head so want
     //to release in reverse order to keep the original transfer order
@@ -636,26 +623,22 @@
 
 void SemanticState::reject(DeliveryId first, DeliveryId last)
 {
-    Mutex::ScopedLock locker(deliveryLock);
     AckRange range = findRange(first, last);
     for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
     //need to remove the delivery records as well
     unacked.erase(range.start, range.end);
 }
 
-
-void SemanticState::FlushCompletion::wait()
+bool SemanticState::ConsumerImpl::doOutput()
 {
-    Monitor::ScopedLock locker(lock);
-    while (!complete) lock.wait();
+    //TODO: think through properly
+    return queue->dispatch(*this);
 }
 
-void SemanticState::FlushCompletion::completed()
+void SemanticState::ConsumerImpl::notify()
 {
-    Monitor::ScopedLock locker(lock);
-    consumer.stop();
-    complete = true;
-    lock.notifyAll();
+    //TODO: think through properly
+    parent->outputTasks.activateOutput();
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Nov 29 03:54:17 2007
@@ -35,6 +35,7 @@
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/AccumulatedAck.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/sys/AggregateOutput.h"
 #include "qpid/shared_ptr.h"
 
 #include <list>
@@ -51,11 +52,11 @@
  * attached to a channel or suspended. 
  */
 class SemanticState : public framing::FrameHandler::Chains,
+                      public sys::OutputTask,
                       private boost::noncopyable
 {
-    class ConsumerImpl : public Consumer
+    class ConsumerImpl : public Consumer, public sys::OutputTask
     {
-        sys::Mutex lock;
         SemanticState* const parent;
         const DeliveryToken::shared_ptr token;
         const string name;
@@ -69,16 +70,17 @@
         uint32_t byteCredit;
 
         bool checkCredit(intrusive_ptr<Message>& msg);
+        void allocateCredit(intrusive_ptr<Message>& msg);
 
       public:
-        typedef shared_ptr<ConsumerImpl> shared_ptr;
-
         ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, 
                      const string& name, Queue::shared_ptr queue,
                      bool ack, bool nolocal, bool acquire);
         ~ConsumerImpl();
         bool deliver(QueuedMessage& msg);            
         bool filter(intrusive_ptr<Message> msg);            
+        bool accept(intrusive_ptr<Message> msg);            
+        void notify();
 
         void setWindowMode();
         void setCreditMode();
@@ -89,20 +91,11 @@
         void acknowledged(const DeliveryRecord&);    
         Queue::shared_ptr getQueue() { return queue; }
         bool isBlocked() const { return blocked; }
-    };
 
-    struct FlushCompletion : DispatchCompletion
-    {
-        sys::Monitor lock;
-        ConsumerImpl& consumer;
-        bool complete;
-        
-        FlushCompletion(ConsumerImpl& c) : consumer(c), complete(false) {}
-        void wait();
-        void completed();
+        bool doOutput();
     };
 
-    typedef std::map<std::string,ConsumerImpl::shared_ptr> ConsumerImplMap;
+    typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
     SessionState& session;
@@ -114,27 +107,26 @@
     Prefetch outstanding;
     NameGenerator tagGenerator;
     std::list<DeliveryRecord> unacked;
-    sys::Mutex deliveryLock;
     TxBuffer::shared_ptr txBuffer;
     DtxBuffer::shared_ptr dtxBuffer;
     bool dtxSelected;
     DtxBufferMap suspendedXids;
     framing::AccumulatedAck accumulatedAck;
     bool flowActive;
-
     boost::shared_ptr<Exchange> cacheExchange;
+    sys::AggregateOutput outputTasks;
     
     void route(intrusive_ptr<Message> msg, Deliverable& strategy);
     void record(const DeliveryRecord& delivery);
     bool checkPrefetch(intrusive_ptr<Message>& msg);
     void checkDtxTimeout();
-    ConsumerImpl::shared_ptr find(const std::string& destination);
+    ConsumerImpl& find(const std::string& destination);
     void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
     void acknowledged(const DeliveryRecord&);
     AckRange findRange(DeliveryId first, DeliveryId last);
     void requestDispatch();
-    void requestDispatch(ConsumerImpl::shared_ptr);
-    void cancel(ConsumerImpl::shared_ptr);
+    void requestDispatch(ConsumerImpl&);
+    void cancel(ConsumerImpl&);
 
   public:
     SemanticState(DeliveryAdapter&, SessionState&);
@@ -188,6 +180,8 @@
     void release(DeliveryId first, DeliveryId last);
     void reject(DeliveryId first, DeliveryId last);
     void handle(intrusive_ptr<Message> msg);
+
+    bool doOutput() { return outputTasks.doOutput(); }
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Thu Nov 29 03:54:17 2007
@@ -70,6 +70,7 @@
                 QPID_MSG("Channel " << channel.get() << " is not open"));
     } catch(const ChannelException& e) {
         ignoring=true;          // Ignore trailing frames sent by client.
+        session->detach();
         session.reset();
         peerSession.closed(e.code, e.what());
     }catch(const ConnectionException& e){
@@ -81,14 +82,9 @@
 }
 
 void SessionHandler::handleOut(AMQFrame& f) {
-    ConditionalScopedLock<Semaphore> s(suspension);
-    if (s.lockAcquired() && session.get() && session->isAttached()) {
-        channel.handle(f);          // Send it.
-        if (session->sent(f))
-            peerSession.solicitAck();
-    } else {
-        QPID_LOG(error, "Dropping frame as session is no longer attached to a channel: " << f);
-    }
+    channel.handle(f);          // Send it.
+    if (session->sent(f))
+        peerSession.solicitAck();
 }
 
 void SessionHandler::assertAttached(const char* method) const {
@@ -138,6 +134,7 @@
     assertAttached("close");
     QPID_LOG(info, "Received session.close");
     ignoring=false;
+    session->detach();
     session.reset();
     peerSession.closed(REPLY_SUCCESS, "ok");
     assert(&connection.getChannel(channel.get()) == this);
@@ -147,14 +144,15 @@
 void  SessionHandler::closed(uint16_t replyCode, const string& replyText) {
     QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
     ignoring=false;
+    session->detach();
     session.reset();
 }
 
 void SessionHandler::localSuspend() {
-    ScopedLock<Semaphore> s(suspension);
     if (session.get() && session->isAttached()) {
         session->detach();
         connection.broker.getSessionManager().suspend(session);
+        session.reset();
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Thu Nov 29 03:54:17 2007
@@ -27,8 +27,6 @@
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/ChannelHandler.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Semaphore.h"
 
 #include <boost/noncopyable.hpp>
 
@@ -95,7 +93,6 @@
     framing::AMQP_ClientProxy::Session peerSession;
     bool ignoring;
     std::auto_ptr<SessionState> session;
-    sys::Semaphore suspension;
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Nov 29 03:54:17 2007
@@ -30,6 +30,7 @@
 namespace broker {
 
 using namespace framing;
+using sys::Mutex;
 
 void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
 
@@ -48,7 +49,8 @@
 {
     // TODO aconway 2007-09-20: SessionManager may add plugin
     // handlers to the chain.
- }
+    getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+}
 
 SessionState::~SessionState() {
     // Remove ID from active session list.
@@ -70,11 +72,28 @@
 }
 
 void SessionState::detach() {
+    getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+    Mutex::ScopedLock l(lock);
     handler = 0;
 }
 
 void SessionState::attach(SessionHandler& h) {
-    handler = &h;
+    {
+        Mutex::ScopedLock l(lock);
+        handler = &h;
+    }
+    h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
 }
+
+void SessionState::activateOutput()
+{
+    Mutex::ScopedLock l(lock);
+    if (isAttached()) {
+        getConnection().outputTasks.activateOutput();
+    }
+}
+    //This class could be used as the callback for queue notifications
+    //if not attached, it can simply ignore the callback, else pass it
+    //on to the connection
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Thu Nov 29 03:54:17 2007
@@ -26,6 +26,8 @@
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SessionState.h"
 #include "qpid/framing/ProtocolVersion.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/OutputControl.h"
 #include "qpid/sys/Time.h"
 
 #include <boost/noncopyable.hpp>
@@ -54,7 +56,8 @@
  * themselves have state. 
  */
 class SessionState : public framing::SessionState,
-                     public framing::FrameHandler::InOutHandler
+    public framing::FrameHandler::InOutHandler,
+    public sys::OutputControl
 {
   public:
     ~SessionState();
@@ -76,6 +79,9 @@
     Broker& getBroker() { return broker; }
     framing::ProtocolVersion getVersion() const { return version; }
 
+    /** OutputControl **/
+    void activateOutput();
+
   protected:
     void handleIn(framing::AMQFrame&);
     void handleOut(framing::AMQFrame&);
@@ -94,7 +100,7 @@
     sys::AbsTime expiry;        // Used by SessionManager.
     Broker& broker;
     framing::ProtocolVersion version;
-    
+    sys::Mutex lock;
     boost::scoped_ptr<SemanticHandler> semanticHandler;
 
   friend class SessionManager;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Nov 29 03:54:17 2007
@@ -106,7 +106,7 @@
 void Connector::send(AMQFrame& frame){
     Mutex::ScopedLock l(writeLock);
     writeFrameQueue.push(frame);
-    aio->queueWrite();
+    aio->notifyPendingWrite();
 
     QPID_LOG(trace, "SENT [" << this << "]: " << frame);
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp?rev=599395&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp Thu Nov 29 03:54:17 2007
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace sys {
+    
+void AggregateOutput::activateOutput()
+{
+    control.activateOutput();
+}
+
+bool AggregateOutput::doOutput()
+{
+    bool result = false;
+    if (!tasks.empty()) {
+        if (next >= tasks.size()) next = next % tasks.size();
+        
+        size_t start = next;
+        //loop until a task generated some output
+        while (!result) {
+            result = tasks[next++]->doOutput();
+            if (next >= tasks.size()) next = next % tasks.size();
+            if (start == next) break;
+        }
+    }
+    return result;
+}
+
+void AggregateOutput::addOutputTask(OutputTask* t)
+{
+    tasks.push_back(t);
+}
+     
+void AggregateOutput::removeOutputTask(OutputTask* t)
+{
+    TaskList::iterator i = find(tasks.begin(), tasks.end(), t);
+    if (i != tasks.end()) tasks.erase(i);
+}
+
+}} // namespace qpid::sys

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h?rev=599395&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h Thu Nov 29 03:54:17 2007
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _AggregateOutput_
+#define _AggregateOutput_
+
+#include <vector>
+#include "Mutex.h"
+#include "OutputControl.h"
+#include "OutputTask.h"
+
+namespace qpid {
+namespace sys {
+
+    class AggregateOutput : public OutputTask, public OutputControl
+    {
+        typedef std::vector<OutputTask*> TaskList;
+
+        TaskList tasks;
+        size_t next;
+        OutputControl& control;
+
+    public:
+        AggregateOutput(OutputControl& c) : next(0), control(c) {};
+        //this may be called on any thread
+        void activateOutput();
+        //all the following will be called on the same thread
+        bool doOutput();
+        void addOutputTask(OutputTask* t);
+        void removeOutputTask(OutputTask* t);
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Thu Nov 29 03:54:17 2007
@@ -97,6 +97,13 @@
     std::deque<BufferBase*> bufferQueue;
     std::deque<BufferBase*> writeQueue;
     bool queuedClose;
+    /**
+     * This flag is used to detect and handle concurrency between
+     * calls to notifyPendingWrite() (which can be made from any thread) and
+     * the execution of the writeable() method (which is always on the
+     * thread processing this handle.
+     */
+    volatile bool writePending;
 
 public:
     AsynchIO(const Socket& s,
@@ -107,7 +114,8 @@
     void start(Poller::shared_ptr poller);
     void queueReadBuffer(BufferBase* buff);
     void unread(BufferBase* buff);
-    void queueWrite(BufferBase* buff = 0);
+    void queueWrite(BufferBase* buff);
+    void notifyPendingWrite();
     void queueWriteClose();
     bool writeQueueEmpty() { return writeQueue.empty(); }
     BufferBase* getQueuedBuffer();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Thu Nov 29 03:54:17 2007
@@ -115,6 +115,7 @@
 	// Output side
 	void send(framing::AMQFrame&);
 	void close();
+        void activateOutput();
 
 	// Input side
 	void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -135,7 +136,7 @@
     	boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
     	boost::bind(&AsynchIOHandler::eof, async, _1),
     	boost::bind(&AsynchIOHandler::disconnect, async, _1),
-		boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+        boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
     	boost::bind(&AsynchIOHandler::nobuffs, async, _1),
     	boost::bind(&AsynchIOHandler::idle, async, _1));
 	async->init(aio, handler);
@@ -195,7 +196,7 @@
 	}
 
 	// Activate aio for writing here
-	aio->queueWrite();
+	aio->notifyPendingWrite();
 }
 
 void AsynchIOHandler::close() {
@@ -203,6 +204,10 @@
 	frameQueueClosed = true;
 }
 
+void AsynchIOHandler::activateOutput() {
+    aio->notifyPendingWrite();
+}
+
 // Input side
 void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
     if (readError) {
@@ -272,9 +277,11 @@
 	ScopedLock<Mutex> l(frameQueueLock);
 	
 	if (frameQueue.empty()) {
-		// At this point we know that we're write idling the connection
-		// so we could note that somewhere or do something special
-		return;
+            // At this point we know that we're write idling the connection
+            // so tell the input handler to queue any available output:
+            inputHandler->doOutput();
+            //if still no frames, theres nothing to do:
+            if (frameQueue.empty()) return;
 	}
 	
 	do {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h Thu Nov 29 03:54:17 2007
@@ -24,6 +24,7 @@
 #include "qpid/framing/InputHandler.h"
 #include "qpid/framing/InitiationHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
+#include "OutputTask.h"
 #include "TimeoutHandler.h"
 
 namespace qpid {
@@ -32,7 +33,7 @@
     class ConnectionInputHandler :
         public qpid::framing::InitiationHandler,
         public qpid::framing::InputHandler, 
-        public TimeoutHandler
+        public TimeoutHandler, public OutputTask
     {
     public:
         virtual void closed() = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h Thu Nov 29 03:54:17 2007
@@ -22,6 +22,7 @@
 #define _ConnectionOutputHandler_
 
 #include "qpid/framing/OutputHandler.h"
+#include "OutputControl.h"
 
 namespace qpid {
 namespace sys {
@@ -29,7 +30,7 @@
 /**
  * Provides the output handler associated with a connection.
  */
-class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler 
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl
 {
   public:
     virtual void close() = 0;

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h?rev=599395&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h Thu Nov 29 03:54:17 2007
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _OutputControl_
+#define _OutputControl_
+
+namespace qpid {
+namespace sys {
+
+    class OutputControl 
+    {
+    public:
+        virtual ~OutputControl() {}
+        virtual void activateOutput() = 0;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputTask.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputTask.h?rev=599395&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputTask.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputTask.h Thu Nov 29 03:54:17 2007
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _OutputTask_
+#define _OutputTask_
+
+namespace qpid {
+namespace sys {
+
+    class OutputTask 
+    {
+    public:
+        virtual ~OutputTask() {}
+        virtual bool doOutput() = 0;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputTask.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/OutputTask.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Thu Nov 29 03:54:17 2007
@@ -97,7 +97,8 @@
     closedCallback(cCb),
     emptyCallback(eCb),
     idleCallback(iCb),
-    queuedClose(false) {
+    queuedClose(false),
+    writePending(false) {
 
     s.setNonblocking();
 }
@@ -139,20 +140,21 @@
     DispatchHandle::rewatchRead();
 }
 
-// Either queue for writing or announce that there is something to write
-// and we should ask for it
 void AsynchIO::queueWrite(BufferBase* buff) {
-	// If no buffer then don't queue anything
-	// (but still wake up for writing) 
-	if (buff) {
-		// If we've already closed the socket then throw the write away
-		if (queuedClose) {
-			bufferQueue.push_front(buff);
-			return;
-		} else {
-    		writeQueue.push_front(buff);
-		}
-	}
+    assert(buff);
+    // If we've already closed the socket then throw the write away
+    if (queuedClose) {
+        bufferQueue.push_front(buff);
+        return;
+    } else {
+        writeQueue.push_front(buff);
+    }
+    writePending = false;
+    DispatchHandle::rewatchWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+    writePending = true;
     DispatchHandle::rewatchWrite();
 }
 
@@ -269,18 +271,24 @@
                 }
             }
         } else {
-        	// If we're waiting to close the socket then can do it now as there is nothing to write
-        	if (queuedClose) {
-        		close(h);
-        		return;
-        	}
+            // If we're waiting to close the socket then can do it now as there is nothing to write
+            if (queuedClose) {
+                close(h);
+                return;
+            }
             // Fd is writable, but nothing to write
             if (idleCallback) {
+                writePending = false;
                 idleCallback(*this);
             }
             // If we still have no buffers to write we can't do anything more
-            if (writeQueue.empty() && !queuedClose) {
+            if (writeQueue.empty() && !writePending && !queuedClose) {
                 h.unwatchWrite();
+                //the following handles the case where writePending is
+                //set to true after the test above; in this case its
+                //possible that the unwatchWrite overwrites the
+                //desired rewatchWrite so we correct that here
+                if (writePending) h.rewatchWrite();
                 return;
             }
         }
@@ -304,7 +312,7 @@
 	h.stopWatch();
 	h.getSocket().close();
 	if (closedCallback) {
-		closedCallback(*this, getSocket());
+            closedCallback(*this, getSocket());
 	}
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Thu Nov 29 03:54:17 2007
@@ -36,6 +36,7 @@
 
 namespace qpid {
 
+using qpid::sys::ConnectionInputHandler;
 
 /**
  * A client::Connector that connects directly to an in-process broker.
@@ -54,13 +55,21 @@
 
     enum Sender {CLIENT,BROKER};
 
+    struct Task {
+        AMQFrame frame;
+        bool doOutput;
+
+        Task() : doOutput(true) {}
+        Task(AMQFrame& f) : frame(f), doOutput(false) {}
+    };
+
     /** Simulate the network thread of a peer with a queue and a thread.
      * With setInputHandler(0) drops frames simulating network packet loss.
      */
     class NetworkQueue : public sys::Runnable
     {
       public:
-        NetworkQueue(const char* r) : inputHandler(0), receiver(r) {
+        NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) {
             thread=sys::Thread(this);
         }
 
@@ -70,17 +79,24 @@
         }
 
         void push(AMQFrame& f) { queue.push(f); }
+        void activateOutput() { queue.push(Task()); }
 
         void run() {
             try {
                 while(true) {
-                    AMQFrame f = queue.pop();
-                    if (inputHandler) { 
-                        QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
-                        inputHandler->handle(f);
+                    Task t = queue.pop();
+                    if (t.doOutput) {
+                        if (connectionHandler) {
+                            while (connectionHandler->doOutput());
+                        }
+                    } else {
+                        if (inputHandler) { 
+                            QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame));
+                            inputHandler->handle(t.frame);
+                        }
+                        else 
+                            QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame));
                     }
-                    else 
-                        QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
                 }
             }
             catch (const ClosedException&) {
@@ -88,16 +104,24 @@
             }
         }
         
+        void setConnectionInputHandler(ConnectionInputHandler* h) {
+            Lock l(lock);
+            inputHandler = h;
+            connectionHandler = h;
+        }
+
         void setInputHandler(FrameHandler* h) {
             Lock l(lock);
             inputHandler = h;
+            connectionHandler = 0;
         }
         
       private:
         sys::Mutex lock;
-        sys::BlockingQueue<AMQFrame> queue;
+        sys::BlockingQueue<Task> queue;
         sys::Thread thread;
         FrameHandler* inputHandler;
+        ConnectionInputHandler* connectionHandler;
         const char* const receiver;
     };
 
@@ -105,11 +129,13 @@
         Sender from;
         NetworkQueue queue;
         const char* const sender;
+        NetworkQueue* reverseQueue;
 
         InProcessHandler(Sender s)
             : from(s),
               queue(from==CLIENT? "BROKER" : "CLIENT"),
-              sender(from==BROKER? "BROKER" : "CLIENT")
+              sender(from==BROKER? "BROKER" : "CLIENT"),
+              reverseQueue(0)
         {}
 
         ~InProcessHandler() {  }
@@ -123,6 +149,10 @@
             // Do not shut down the queue here, we may be in
             // the queue's dispatch thread. 
         }
+
+        void activateOutput() { 
+            if (reverseQueue) reverseQueue->activateOutput(); 
+        }
     };
 
     InProcessConnector(shared_ptr<broker::Broker> b,
@@ -135,7 +165,9 @@
         clientOut(CLIENT),
         isClosed(false)
     {
-        clientOut.queue.setInputHandler(&brokerConnection);
+        clientOut.queue.setConnectionInputHandler(&brokerConnection);
+        brokerOut.reverseQueue = &clientOut.queue;
+        clientOut.reverseQueue = &brokerOut.queue;
     }
 
     ~InProcessConnector() {
@@ -169,7 +201,7 @@
     /** Sliently discard frames sent by either party, lost network traffic. */
     void discard() {
         brokerOut.queue.setInputHandler(0);
-        clientOut.queue.setInputHandler(0);
+        clientOut.queue.setConnectionInputHandler(0);
     }
 
   private:

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=599395&r1=599394&r2=599395&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Thu Nov 29 03:54:17 2007
@@ -47,6 +47,7 @@
         received = true;
         return true;
     };
+    void notify() {}
 };
 
 class FailOnDeliver : public Deliverable
@@ -88,7 +89,7 @@
         Queue::shared_ptr queue(new Queue("my_test_queue", true));
         intrusive_ptr<Message> received;
 	
-        TestConsumer::shared_ptr c1(new TestConsumer()); 
+        TestConsumer c1;
         queue->consume(c1);
 
        
@@ -98,7 +99,7 @@
         queue->process(msg1);
 	sleep(2);
 
-        CPPUNIT_ASSERT(!c1->received);
+        CPPUNIT_ASSERT(!c1.received);
  	msg1->enqueueComplete();
 
         received = queue->dequeue().payload;
@@ -127,8 +128,8 @@
         Queue::shared_ptr queue(new Queue("my_queue", true));
     
         //Test adding consumers:
-        TestConsumer::shared_ptr c1(new TestConsumer()); 
-        TestConsumer::shared_ptr c2(new TestConsumer()); 
+        TestConsumer c1;
+        TestConsumer c2;
         queue->consume(c1);
         queue->consume(c2);
 
@@ -140,20 +141,17 @@
         intrusive_ptr<Message> msg3 = message("e", "C");
 
         queue->deliver(msg1);
-	if (!c1->received)
-	    sleep(2);
-        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1->last.get());
+        CPPUNIT_ASSERT(queue->dispatch(c1));
+        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
 
         queue->deliver(msg2);
-	if (!c2->received)
-	    sleep(2);
-        CPPUNIT_ASSERT_EQUAL(msg2.get(), c2->last.get());
+        CPPUNIT_ASSERT(queue->dispatch(c2));
+        CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
         
-	c1->received = false;
+	c1.received = false;
         queue->deliver(msg3);
-	if (!c1->received)
-	    sleep(2);
-        CPPUNIT_ASSERT_EQUAL(msg3.get(), c1->last.get());        
+        CPPUNIT_ASSERT(queue->dispatch(c1));
+        CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());        
     
         //Test cancellation:
         queue->cancel(c1);
@@ -203,13 +201,13 @@
         CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
         CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount());
 
-        TestConsumer::shared_ptr consumer(new TestConsumer()); 
+        TestConsumer consumer;
         queue->consume(consumer);
-        queue->requestDispatch();
-	if (!consumer->received)
+        queue->dispatch(consumer);
+	if (!consumer.received)
 	    sleep(2);
 
-        CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer->last.get());
+        CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
         CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
 
         received = queue->dequeue().payload;