You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/09/09 19:15:17 UTC

svn commit: r693518 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ tests/

Author: gsim
Date: Tue Sep  9 10:15:17 2008
New Revision: 693518

URL: http://svn.apache.org/viewvc?rev=693518&view=rev
Log:
QPID-1261: initial fix (this degrades performance for shared queues with more than one consumer; I'll work on fixing that asap). This also moves the lock refered to in QQPID-1265 which I will update accordingly.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Tue Sep  9 10:15:17 2008
@@ -47,7 +47,7 @@
         class Consumer {
             const bool acquires;
         public:
-            typedef shared_ptr<Consumer> ptr;            
+            typedef shared_ptr<Consumer> shared_ptr;            
 
             framing::SequenceNumber position;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep  9 10:15:17 2008
@@ -90,8 +90,12 @@
 
 void Queue::notifyDurableIOComplete()
 {
-    Mutex::ScopedLock locker(messageLock);
-    notify();
+    Listeners copy;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        listeners.swap(copy);
+    }
+    for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
 }
 
 bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -181,10 +185,14 @@
 }
 
 void Queue::requeue(const QueuedMessage& msg){
-    Mutex::ScopedLock locker(messageLock);
-    msg.payload->enqueueComplete(); // mark the message as enqueued
-    messages.push_front(msg);
-    notify();
+    Listeners copy;
+    {    
+        Mutex::ScopedLock locker(messageLock);
+        msg.payload->enqueueComplete(); // mark the message as enqueued
+        messages.push_front(msg);
+        listeners.swap(copy);
+    }
+    for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
 }
 
 bool Queue::acquire(const QueuedMessage& msg) {
@@ -203,16 +211,16 @@
     return false;
 }
 
-bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
-    if (c.preAcquires()) {
+    if (c->preAcquires()) {
         return consumeNextMessage(m, c);
     } else {
         return browseNextMessage(m, c);
     }
 }
 
-bool Queue::checkForMessages(Consumer& c)
+bool Queue::checkForMessages(Consumer::shared_ptr c)
 {
     Mutex::ScopedLock locker(messageLock);
     if (messages.empty()) {
@@ -233,12 +241,12 @@
             //message (if it does not, no need to register it for
             //notification as the consumer itself will handle the
             //credit allocation required to change this condition).
-            return c.accept(msg.payload);
+            return c->accept(msg.payload);
         }
     }
 }
 
-bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     while (true) {
         Mutex::ScopedLock locker(messageLock);
@@ -254,8 +262,8 @@
                 return false;
             }
             
-            if (c.filter(msg.payload)) {
-                if (c.accept(msg.payload)) {            
+            if (c->filter(msg.payload)) {
+                if (c->accept(msg.payload)) {            
                     m = msg;
                     messages.pop_front();
                     return true;
@@ -274,14 +282,14 @@
 }
 
 
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     QueuedMessage msg(this);
     while (seek(msg, c)) {
-        if (c.filter(msg.payload)) {
-            if (c.accept(msg.payload)) {
+        if (c->filter(msg.payload)) {
+            if (c->accept(msg.payload)) {
                 //consumer wants the message
-                c.position = msg.position;
+                c->position = msg.position;
                 m = msg;
                 return true;
             } else {
@@ -291,59 +299,47 @@
             }
         } else {
             //consumer will never want this message, continue seeking
-            c.position = msg.position;
+            c->position = msg.position;
             QPID_LOG(debug, "Browser skipping message from '" << name << "'");
         }
     }
     return false;
 }
 
-/**
- * notify listeners that there may be messages to process
- */
-void Queue::notify()
-{
-    if (listeners.empty()) return;
-
-    Listeners copy(listeners);
-    listeners.clear();
-    for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify));
-}
-
-void Queue::removeListener(Consumer& c)
+void Queue::removeListener(Consumer::shared_ptr c)
 {
     Mutex::ScopedLock locker(messageLock);
-    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c);
+    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
     if (i != listeners.end()) listeners.erase(i);
 }
 
-void Queue::addListener(Consumer& c)
+void Queue::addListener(Consumer::shared_ptr c)
 {
-    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c);
-    if (i == listeners.end()) listeners.push_back(&c);
+    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+    if (i == listeners.end()) listeners.push_back(c);
 }
 
-bool Queue::dispatch(Consumer& c)
+bool Queue::dispatch(Consumer::shared_ptr c)
 {
     QueuedMessage msg(this);
     if (getNextMessage(msg, c)) {
-        c.deliver(msg);
+        c->deliver(msg);
         return true;
     } else {
         return false;
     }
 }
 
-bool Queue::seek(QueuedMessage& msg, Consumer& c) {
+bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
     Mutex::ScopedLock locker(messageLock);
-    if (!messages.empty() && messages.back().position > c.position) {
-        if (c.position < messages.front().position) {
+    if (!messages.empty() && messages.back().position > c->position) {
+        if (c->position < messages.front().position) {
             msg = messages.front();
             return true;
         } else {        
             //TODO: can improve performance of this search, for now just searching linearly from end
             Messages::reverse_iterator pos;
-            for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c.position; i++) {
+            for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) {
                 pos = i;
             }
             msg = *pos;
@@ -354,7 +350,7 @@
     return false;
 }
 
-void Queue::consume(Consumer& c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
     Mutex::ScopedLock locker(consumerLock);
     if(exclusive) {
         throw ResourceLockedException(
@@ -364,7 +360,7 @@
             throw ResourceLockedException(
                 QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
         } else {
-            exclusive = c.getSession();
+            exclusive = c->getSession();
         }
     }
     consumerCount++;
@@ -372,7 +368,7 @@
         mgmtObject->inc_consumerCount ();
 }
 
-void Queue::cancel(Consumer& c){
+void Queue::cancel(Consumer::shared_ptr c){
     removeListener(c);
     Mutex::ScopedLock locker(consumerLock);
     consumerCount--;
@@ -415,35 +411,40 @@
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg){
-    Mutex::ScopedLock locker(messageLock);   
-    messages.push_back(QueuedMessage(this, msg, ++sequence));
-    if (policy.get()) {
-        policy->enqueued(msg->contentSize());
-        if (policy->limitExceeded()) {
-            if (!policyExceeded) {
-                policyExceeded = true;
-                QPID_LOG(info, "Queue size exceeded policy for " << name);
-            }
-            if (store) {
-                QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
-                msg->releaseContent(store);
+    Listeners copy;
+    {
+        Mutex::ScopedLock locker(messageLock);   
+        messages.push_back(QueuedMessage(this, msg, ++sequence));
+        if (policy.get()) {
+            policy->enqueued(msg->contentSize());
+            if (policy->limitExceeded()) {
+                if (!policyExceeded) {
+                    policyExceeded = true;
+                    QPID_LOG(info, "Queue size exceeded policy for " << name);
+                }
+                if (store) {
+                    QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
+                    msg->releaseContent(store);
+                } else {
+                    QPID_LOG(error, "Message " << msg << " on " << name
+                             << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
+                    throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
+                }
             } else {
-                QPID_LOG(error, "Message " << msg << " on " << name
-                         << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
-                throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
-            }
-        } else {
-            if (policyExceeded) {
-                policyExceeded = false;
-                QPID_LOG(info, "Queue size within policy for " << name);
+                if (policyExceeded) {
+                    policyExceeded = false;
+                    QPID_LOG(info, "Queue size within policy for " << name);
+                }
             }
         }
+        listeners.swap(copy);
     }
-    notify();
+    for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
 }
 
 /** function only provided for unit tests, or code not in critical message path */
-uint32_t Queue::getMessageCount() const{
+uint32_t Queue::getMessageCount() const
+{
     Mutex::ScopedLock locker(messageLock);
   
     uint32_t count =0;
@@ -454,12 +455,14 @@
     return count;
 }
 
-uint32_t Queue::getConsumerCount() const{
+uint32_t Queue::getConsumerCount() const
+{
     Mutex::ScopedLock locker(consumerLock);
     return consumerCount;
 }
 
-bool Queue::canAutoDelete() const{
+bool Queue::canAutoDelete() const
+{
     Mutex::ScopedLock locker(consumerLock);
     return autodelete && !consumerCount;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Sep  9 10:15:17 2008
@@ -34,6 +34,7 @@
 #include "qpid/management/Queue.h"
 #include "qpid/framing/amqp_types.h"
 
+#include <list>
 #include <vector>
 #include <memory>
 #include <deque>
@@ -60,7 +61,8 @@
          */
         class Queue : public boost::enable_shared_from_this<Queue>,
             public PersistableQueue, public management::Manageable {
-            typedef qpid::InlineVector<Consumer*, 5> Listeners;
+
+            typedef std::list<Consumer::shared_ptr> Listeners;
             typedef std::deque<QueuedMessage> Messages;
 
             const string name;
@@ -88,14 +90,13 @@
 
             void push(boost::intrusive_ptr<Message>& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
-            bool seek(QueuedMessage& msg, Consumer& position);
-            bool getNextMessage(QueuedMessage& msg, Consumer& c);
-            bool consumeNextMessage(QueuedMessage& msg, Consumer& c);
-            bool browseNextMessage(QueuedMessage& msg, Consumer& c);
-
-            void notify();
-            void removeListener(Consumer&);
-            void addListener(Consumer&);
+            bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
+            bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+            bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+            bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+
+            void removeListener(Consumer::shared_ptr);
+            void addListener(Consumer::shared_ptr);
 
             bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
@@ -115,14 +116,14 @@
                   management::Manageable* parent = 0);
             ~Queue();
 
-            bool dispatch(Consumer&);
+            bool dispatch(Consumer::shared_ptr);
             /**
              * Check whether there would be a message available for
              * dispatch to this consumer. If not, the consumer will be
              * notified of events that may have changed this
              * situation.
              */
-            bool checkForMessages(Consumer&);
+            bool checkForMessages(Consumer::shared_ptr);
 
             void create(const qpid::framing::FieldTable& settings);
             void configure(const qpid::framing::FieldTable& settings);
@@ -154,8 +155,8 @@
              */
             void recover(boost::intrusive_ptr<Message>& msg);
 
-            void consume(Consumer& c, bool exclusive = false);
-            void cancel(Consumer& c);
+            void consume(Consumer::shared_ptr c, bool exclusive = false);
+            void cancel(Consumer::shared_ptr c);
 
             uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Sep  9 10:15:17 2008
@@ -72,7 +72,7 @@
 SemanticState::~SemanticState() {
     //cancel all consumers
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
-        cancel(*ptr_map_ptr(i));
+        cancel(i->second);
     }
 
     if (dtxBuffer.get()) {
@@ -91,16 +91,16 @@
 {
     if(tagInOut.empty())
         tagInOut = tagGenerator.generate();
-    std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire));
-    queue->consume(*c, exclusive);//may throw exception
+    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire));
+    queue->consume(c, exclusive);//may throw exception
     outputTasks.addOutputTask(c.get());
-    consumers.insert(tagInOut, c.release());
+    consumers[tagInOut] = c;
 }
 
 void SemanticState::cancel(const string& tag){
     ConsumerImplMap::iterator i = consumers.find(tag);
     if (i != consumers.end()) {
-        cancel(*ptr_map_ptr(i));
+        cancel(i->second);
         consumers.erase(i); 
         //should cancel all unacked messages for this consumer so that
         //they are not redelivered on recovery
@@ -260,7 +260,8 @@
     blocked(true), 
     windowing(true), 
     msgCredit(0), 
-    byteCredit(0){}
+    byteCredit(0),
+    notifyEnabled(true) {}
 
 OwnershipToken* SemanticState::ConsumerImpl::getSession()
 {
@@ -324,10 +325,11 @@
 
 SemanticState::ConsumerImpl::~ConsumerImpl() {}
 
-void SemanticState::cancel(ConsumerImpl& c)
+void SemanticState::cancel(ConsumerImpl::shared_ptr c)
 {
-    outputTasks.removeOutputTask(&c);
-    Queue::shared_ptr queue = c.getQueue();
+    c->disableNotify();
+    outputTasks.removeOutputTask(c.get());
+    Queue::shared_ptr queue = c->getQueue();
     if(queue) {
         queue->cancel(c);
         if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {            
@@ -358,10 +360,10 @@
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
     }
 
-	if (acl && acl->doTransferAcl())
-	{
-	    if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() ))
-	        throw NotAllowedException("ACL denied exhange publish request");
+    if (acl && acl->doTransferAcl())
+    {
+        if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() ))
+            throw NotAllowedException("ACL denied exhange publish request");
     }
 
     cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -382,7 +384,7 @@
 void SemanticState::requestDispatch()
 {    
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
-        requestDispatch(*ptr_map_ptr(i));
+        requestDispatch(*(i->second));
     }
 }
 
@@ -402,7 +404,7 @@
     delivery.subtractFrom(outstanding);
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
-        ptr_map_ptr(i)->complete(delivery);
+        i->second->complete(delivery);
     }
 }
 
@@ -460,7 +462,7 @@
     if (i == consumers.end()) {
         throw NotFoundException(QPID_MSG("Unknown destination " << destination));
     } else {
-        return *ptr_map_ptr(i);
+        return *(i->second);
     }
 }
 
@@ -526,7 +528,7 @@
 
 void SemanticState::ConsumerImpl::flush()
 {
-    while(queue->dispatch(*this))
+    while(queue->dispatch(shared_from_this()))
         ;
     stop();
 }
@@ -591,19 +593,34 @@
 }
 
 bool SemanticState::ConsumerImpl::hasOutput() {
-    return queue->checkForMessages(*this);
+    return queue->checkForMessages(shared_from_this());
 }
 
 bool SemanticState::ConsumerImpl::doOutput()
 {
-    //TODO: think through properly
-    return queue->dispatch(*this);
+    return queue->dispatch(shared_from_this());
+}
+
+void SemanticState::ConsumerImpl::enableNotify()
+{
+    Mutex::ScopedLock l(lock);
+    notifyEnabled = true;
+}
+
+void SemanticState::ConsumerImpl::disableNotify()
+{
+    Mutex::ScopedLock l(lock);
+    notifyEnabled = true;
 }
 
 void SemanticState::ConsumerImpl::notify()
 {
-    //TODO: think through properly
-    parent->outputTasks.activateOutput();
+    //TODO: alter this, don't want to hold locks across external
+    //calls; for now its is required to protect the notify() from
+    //having part of the object chain of the invocation being
+    //concurrently deleted
+    Mutex::ScopedLock l(lock);
+    if (notifyEnabled) parent->outputTasks.activateOutput();
 }
 
 
@@ -644,4 +661,18 @@
     requestDispatch();
 }
 
+void SemanticState::attached()
+{
+    for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+        i->second->enableNotify();
+    }
+}
+
+void SemanticState::detached()
+{
+    for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+        i->second->disableNotify();
+    }
+}
+
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Sep  9 10:15:17 2008
@@ -37,6 +37,7 @@
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/Mutex.h"
 #include "qpid/shared_ptr.h"
 #include "AclModule.h"
 
@@ -58,8 +59,10 @@
 class SemanticState : public sys::OutputTask,
                       private boost::noncopyable
 {
-    class ConsumerImpl : public Consumer, public sys::OutputTask
+    class ConsumerImpl : public Consumer, public sys::OutputTask,
+                         public boost::enable_shared_from_this<ConsumerImpl>
     {
+        qpid::sys::Mutex lock;
         SemanticState* const parent;
         const DeliveryToken::shared_ptr token;
         const string name;
@@ -71,11 +74,14 @@
         bool windowing;
         uint32_t msgCredit;
         uint32_t byteCredit;
+        bool notifyEnabled;
 
         bool checkCredit(boost::intrusive_ptr<Message>& msg);
         void allocateCredit(boost::intrusive_ptr<Message>& msg);
 
       public:
+        typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
+
         ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, 
                      const string& name, Queue::shared_ptr queue,
                      bool ack, bool nolocal, bool acquire);
@@ -84,6 +90,9 @@
         bool deliver(QueuedMessage& msg);            
         bool filter(boost::intrusive_ptr<Message> msg);            
         bool accept(boost::intrusive_ptr<Message> msg);            
+
+        void disableNotify();
+        void enableNotify();
         void notify();
 
         void setWindowMode();
@@ -100,7 +109,7 @@
         bool doOutput();
     };
 
-    typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
+    typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
     SessionContext& session;
@@ -130,7 +139,7 @@
     AckRange findRange(DeliveryId first, DeliveryId last);
     void requestDispatch();
     void requestDispatch(ConsumerImpl&);
-    void cancel(ConsumerImpl&);
+    void cancel(ConsumerImpl::shared_ptr);
 
   public:
     SemanticState(DeliveryAdapter&, SessionContext&);
@@ -187,6 +196,9 @@
     //final 0-10 spec (completed and accepted are distinct):
     void completed(DeliveryId deliveryTag, DeliveryId endTag);
     void accepted(DeliveryId deliveryTag, DeliveryId endTag);
+
+    void attached();
+    void detached();
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Sep  9 10:15:17 2008
@@ -92,9 +92,8 @@
 }
 
 void SessionState::detach() {
-    // activateOutput can be called in a different thread, lock to protect attached status
-    Mutex::ScopedLock l(lock);
     QPID_LOG(debug, getId() << ": detached on broker.");
+    semanticState.detached();//prevents further activateOutput calls until reattached
     getConnection().outputTasks.removeOutputTask(&semanticState);
     handler = 0;
     if (mgmtObject != 0)
@@ -102,8 +101,6 @@
 }
 
 void SessionState::attach(SessionHandler& h) {
-    // activateOutput can be called in a different thread, lock to protect attached status
-    Mutex::ScopedLock l(lock);
     QPID_LOG(debug, getId() << ": attached on broker.");
     handler = &h;
     if (mgmtObject != 0)
@@ -115,8 +112,6 @@
 }
 
 void SessionState::activateOutput() {
-    // activateOutput can be called in a different thread, lock to protect attached status
-    Mutex::ScopedLock l(lock);
     if (isAttached()) 
         getConnection().outputTasks.activateOutput();
 }
@@ -273,6 +268,7 @@
 void SessionState::readyToSend() {
     QPID_LOG(debug, getId() << ": ready to send, activating output.");
     assert(handler);
+    semanticState.attached();
     sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
     tasks.addOutputTask(&semanticState);
     tasks.activateOutput();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Sep  9 10:15:17 2008
@@ -25,7 +25,6 @@
 #include "qpid/SessionState.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
-#include "qpid/sys/Mutex.h"
 #include "qpid/sys/Time.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Session.h"
@@ -117,7 +116,6 @@
     Broker& broker;
     SessionHandler* handler;    
     sys::AbsTime expiry;        // Used by SessionManager.
-    sys::Mutex lock;
     bool ignoring;
     std::string name;
     SemanticState semanticState;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=693518&r1=693517&r2=693518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Sep  9 10:15:17 2008
@@ -78,7 +78,7 @@
     Queue::shared_ptr queue(new Queue("my_test_queue", true));
     intrusive_ptr<Message> received;
     
-    TestConsumer c1;
+    TestConsumer::shared_ptr c1(new TestConsumer());
     queue->consume(c1);
     
     
@@ -88,7 +88,7 @@
     queue->process(msg1);
     sleep(2);
     
-    BOOST_CHECK(!c1.received);
+    BOOST_CHECK(!c1->received);
     msg1->enqueueComplete();
     
     received = queue->get().payload;
@@ -114,8 +114,8 @@
     Queue::shared_ptr queue(new Queue("my_queue", true));
     
     //Test adding consumers:
-    TestConsumer c1;
-    TestConsumer c2;
+    TestConsumer::shared_ptr c1(new TestConsumer());
+    TestConsumer::shared_ptr c2(new TestConsumer());
     queue->consume(c1);
     queue->consume(c2);
     
@@ -128,16 +128,16 @@
     
     queue->deliver(msg1);
     BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg1.get(), c1.last.get());
+    BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
     
     queue->deliver(msg2);
     BOOST_CHECK(queue->dispatch(c2));
-    BOOST_CHECK_EQUAL(msg2.get(), c2.last.get());
+    BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
     
-    c1.received = false;
+    c1->received = false;
     queue->deliver(msg3);
     BOOST_CHECK(queue->dispatch(c1));
-    BOOST_CHECK_EQUAL(msg3.get(), c1.last.get());        
+    BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());        
     
     //Test cancellation:
     queue->cancel(c1);
@@ -187,13 +187,13 @@
     BOOST_CHECK_EQUAL(msg2.get(), received.get());
     BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
 
-    TestConsumer consumer;
+    TestConsumer::shared_ptr consumer(new TestConsumer());
     queue->consume(consumer);
     queue->dispatch(consumer);
-    if (!consumer.received)
+    if (!consumer->received)
         sleep(2);
 
-    BOOST_CHECK_EQUAL(msg3.get(), consumer.last.get());
+    BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
     BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
 
     received = queue->get().payload;