You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/21 20:25:47 UTC

svn commit: r510161 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/ lib/client/ lib/common/ lib/common/framing/ lib/common/sys/ tests/

Author: aconway
Date: Wed Feb 21 11:25:45 2007
New Revision: 510161

URL: http://svn.apache.org/viewvc?view=rev&rev=510161
Log:
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h:  const correctness.
* cpp/tests/*: MessageListener const change.
* cpp/lib/broker/Content.h:  Removed out-of-date FIXME comments.
* cpp/lib/client/ClientChannel.h/	.cpp():
 - added locking for consumers map and other member access.
 - refactored implementations of Basic get, deliver, return: 
   most logic now encapsulted in IncomingMessage class.
 - fix channel close problems.
* cpp/lib/client/ClientMessage.h/.cpp:
 - const correctness & API convenience fixes.
 - getMethod/setMethod/getHeader: for new IncomingMessage
* cpp/lib/client/Connection.h/.cpp:
 - Fixes to channel closure.
* cpp/lib/client/IncomingMessage.h/.cpp:
 - Encapsulate *all* incoming message handling for client.
 - Moved handling of BasicGetOk to IncomingMessage to fix race.
 - Thread safety fixes.
* cpp/lib/client/ResponseHandler.h/.cpp:
 - added getResponse for ClientChannel.
* cpp/lib/common/Exception.h:
 - added missing throwSelf implementations.
 - added ShutdownException as general purpose shut-down indicator.
 - added EmptyException as general purpose "empty" indicator.
* cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp:
 - Condition variable abstraction extracted from Monitor for situations
   where a single lock is associated with multiple conditions.
* cpp/tests/ClientChannelTest.cpp:
 - Test incoming message transfer, get, consume etc.

Added:
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h Wed Feb 21 11:25:45 2007
@@ -42,8 +42,6 @@
     virtual ~Content(){}
     
     /** Add a block of data to the content */
-    // FIXME aconway 2007-02-07: 
-    // virtual void add(const DataBlock& data) = 0;
     virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
 
     /** Total size of content in bytes */
@@ -54,8 +52,6 @@
      * Subdivide blocks if necessary to ensure each block is
      * <= framesize bytes long.
      */
-    // FIXME aconway 2007-02-07: 
-    // virtual void send(SendFn send, u_int32_t framesize) = 0;
     virtual void send(framing::ChannelAdapter& channel, u_int32_t framesize) = 0;
 
     //FIXME aconway 2007-02-07: This is inconsistently implemented

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Wed Feb 21 11:25:45 2007
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include <iostream>
 #include <ClientChannel.h>
 #include <sys/Monitor.h>
 #include <ClientMessage.h>
@@ -29,17 +30,14 @@
 // handling of errors that should close the connection or the channel.
 // Make sure the user thread receives a connection in each case.
 //
-
-using namespace boost;          //to use dynamic_pointer_cast
+using namespace std;
+using namespace boost;
 using namespace qpid::client;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-const std::string Channel::OK("OK");
-
 Channel::Channel(bool _transactional, u_int16_t _prefetch) :
     connection(0), 
-    incoming(0),
     prefetch(_prefetch), 
     transactional(_transactional)
 { }
@@ -106,8 +104,8 @@
         ConnectionRedirectBody::shared_ptr redirect(
             shared_polymorphic_downcast<ConnectionRedirectBody>(
                 responses.getResponse()));
-        std::cout << "Received redirection to " << redirect->getHost()
-                  << std::endl;
+        cout << "Received redirection to " << redirect->getHost()
+             << endl;
     } else {
         THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
     }
@@ -183,11 +181,11 @@
     Queue& queue, std::string& tag, MessageListener* listener, 
     int ackMode, bool noLocal, bool synch, const FieldTable* fields)
 {
-    string q = queue.getName();
     sendAndReceiveSync<BasicConsumeOkBody>(
         synch,
         new BasicConsumeBody(
-            version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+            version, 0, queue.getName(), tag, noLocal,
+            ackMode == NO_ACK, false, !synch,
             fields ? *fields : FieldTable()));
     if (synch) {
         BasicConsumeOkBody::shared_ptr response =
@@ -195,90 +193,78 @@
                 responses.getResponse());
         tag = response->getConsumerTag();
     }
-    Consumer& c = consumers[tag];
-    c.listener = listener;
-    c.ackMode = ackMode;
-    c.lastDeliveryTag = 0;
+    // FIXME aconway 2007-02-20: Race condition!
+    // We could receive the first message for the consumer
+    // before we create the consumer below.
+    // Move consumer creation to handler for BasicConsumeOkBody
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(tag);
+        if (i != consumers.end())
+            THROW_QPID_ERROR(CLIENT_ERROR,
+                             "Consumer already exists with tag="+tag);
+        Consumer& c = consumers[tag];
+        c.listener = listener;
+        c.ackMode = ackMode;
+        c.lastDeliveryTag = 0;
+    }
 }
 
 void Channel::cancel(const std::string& tag, bool synch) {
-    ConsumerMap::iterator i = consumers.find(tag);
-    if (i != consumers.end()) {
-        Consumer& c = i->second;
-        if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
-            send(new BasicAckBody(version, c.lastDeliveryTag, true));
-        sendAndReceiveSync<BasicCancelOkBody>(
-            synch, new BasicCancelBody(version, tag, !synch));
-        consumers.erase(tag);
-    }
+    Consumer c;
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(tag);
+        if (i == consumers.end())
+            return;
+        c = i->second;
+        consumers.erase(i);
+    }
+    if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
+        send(new BasicAckBody(version, c.lastDeliveryTag, true));
+    sendAndReceiveSync<BasicCancelOkBody>(
+        synch, new BasicCancelBody(version, tag, !synch));
 }
 
 void Channel::cancelAll(){
-    while(!consumers.empty()) {
-        Consumer c = consumers.begin()->second;
-        consumers.erase(consumers.begin());
+    ConsumerMap consumersCopy;
+    {
+        Mutex::ScopedLock l(lock);
+        consumersCopy = consumers;
+        consumers.clear();
+    }
+    for (ConsumerMap::iterator i=consumersCopy.begin();
+         i  != consumersCopy.end(); ++i)
+    {
+        Consumer& c = i->second;
         if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
             && c.lastDeliveryTag > 0)
         {
-            // Let exceptions propagate, if one fails no point
-            // trying the rest. NB no memory leaks if we do,
-            // ConsumerMap holds values, not pointers.
-            // 
             send(new BasicAckBody(version, c.lastDeliveryTag, true));
         }
     }
 }
 
-void Channel::retrieve(Message& msg){
-    Monitor::ScopedLock l(retrievalMonitor);
-    while(retrieved == 0){
-        retrievalMonitor.wait();
-    }
-
-    msg.header = retrieved->getHeader();
-    msg.deliveryTag = retrieved->getDeliveryTag();
-    msg.data = retrieved->getData();
-    delete retrieved;
-    retrieved = 0;
-}
-
 bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
-    string name = queue.getName();
-    responses.expect();
-    send(new BasicGetBody(version, 0, name, ackMode));
-    responses.waitForResponse();
-    AMQMethodBody::shared_ptr response = responses.getResponse();
-    if(response->isA<BasicGetOkBody>()) {
-        if(incoming != 0){
-            std::cout << "Existing message not complete" << std::endl;
-            // FIXME aconway 2007-01-26: close the connection? the channel?
-            THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
-        }else{
-            incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
-        }
-        retrieve(msg);
-        return true;
-    }if(response->isA<BasicGetEmptyBody>()){
-        return false;
-    }else{
-        // FIXME aconway 2007-01-26: must close the connection.
-        THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame");
-    }
+    // Expect a message starting with a BasicGetOk
+    incoming.startGet();
+    send(new BasicGetBody(version, 0, queue.getName(), ackMode));
+    return incoming.waitGet(msg);
 }
 
     
-void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
-    // FIXME aconway 2007-01-30: Rework for message class.
-    
-    string e = exchange.getName();
+void Channel::publish(
+    const Message& msg, const Exchange& exchange,
+    const std::string& routingKey, bool mandatory, bool immediate)
+{
+    // FIXME aconway 2007-01-30: Rework for 0-9 message class.
+    const string e = exchange.getName();
     string key = routingKey;
 
     send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
     //break msg up into header frame and content frame(s) and send these
-    string data = msg.getData();
-    msg.header->setContentSize(data.length());
     send(msg.header);
-    
+    string data = msg.getData();
     u_int64_t data_length = data.length();
     if(data_length > 0){
         u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
@@ -312,30 +298,30 @@
 {
     //channel.flow, channel.close, basic.deliver, basic.return or a
     //response to a synchronous request
-    if(responses.isWaiting()){
+    if(responses.isWaiting()) {
         responses.signalResponse(body);
-    }else if(body->isA<BasicDeliverBody>()) {
-        if(incoming != 0){
-            THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
-            std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
-            THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
-        }else{
-            incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
-        }
-    }else if(body->isA<BasicReturnBody>()){
-        if(incoming != 0){
-            std::cout << "Existing message not complete" << std::endl;
-            THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
-        }else{
-            incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
-        }
-    }else if(body->isA<ChannelCloseBody>()){
+        return;
+    }
+
+    if(body->isA<BasicDeliverBody>()
+       || body->isA<BasicReturnBody>()
+       || body->isA<BasicGetOkBody>()
+       || body->isA<BasicGetEmptyBody>())
+        
+    {
+        incoming.add(body);
+        return;
+    }
+    else if(body->isA<ChannelCloseBody>()) {
         peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
-    }else if(body->isA<ChannelFlowBody>()){
-        // TODO aconway 2007-01-24: 
-    }else if(body->isA<ConnectionCloseBody>()){
+    }
+    else if(body->isA<ChannelFlowBody>()){
+        // TODO aconway 2007-01-24: not implemented yet.
+    }
+    else if(body->isA<ConnectionCloseBody>()){
         connection->close();
-    }else{
+    }
+    else {
         connection->close(
             504, "Unrecognised method",
             body->amqpClassId(), body->amqpMethodId());
@@ -343,31 +329,13 @@
 }
 
 void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
-    if(incoming == 0){
-        //handle invalid frame sequence
-        std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
-        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
-    }else{
-        incoming->setHeader(body);
-        if(incoming->isComplete()){ 
-            enqueue();            
-        }
-    }           
+    incoming.add(body);
 }
     
 void Channel::handleContent(AMQContentBody::shared_ptr body){
-    if(incoming == 0){
-        //handle invalid frame sequence
-        std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
-        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
-    }else{
-        incoming->addContent(body);
-        if(incoming->isComplete()){
-            enqueue();
-        }
-    }           
+    incoming.add(body);
 }
-    
+
 void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
     THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
 }
@@ -376,35 +344,6 @@
     dispatcher = Thread(this);
 }
 
-void Channel::run(){
-    dispatch();
-}
-
-void Channel::enqueue(){
-    Monitor::ScopedLock l(retrievalMonitor);
-    if(incoming->isResponse()){
-        retrieved = incoming;
-        retrievalMonitor.notify();
-    }else{
-        messages.push(incoming);
-        dispatchMonitor.notify();
-    }
-    incoming = 0;
-}
-
-IncomingMessage* Channel::dequeue(){
-    Monitor::ScopedLock l(dispatchMonitor);
-    while(messages.empty() && isOpen()){
-        dispatchMonitor.wait();
-    }    
-    IncomingMessage* msg = 0;
-    if(!messages.empty()){
-        msg = messages.front();
-        messages.pop();
-    }
-    return msg; 
-}
-
 void Channel::deliver(Consumer& consumer, Message& msg){
     //record delivery tag:
     consumer.lastDeliveryTag = msg.getDeliveryTag();
@@ -412,8 +351,6 @@
     //allow registered listener to handle the message
     consumer.listener->received(msg);
 
-    //if the handler calls close on the channel or connection while
-    //handling this message, then consumer will now have been deleted.
     if(isOpen()){
         bool multiple(false);
         switch(consumer.ackMode){
@@ -432,35 +369,53 @@
     //a transaction until it commits.
 }
 
-void Channel::dispatch(){
-    while(isOpen()){
-        IncomingMessage* incomingMsg = dequeue();
-        if(incomingMsg){
-            //Note: msg is currently only valid for duration of this call
-            Message msg(incomingMsg->getHeader());
-            msg.data = incomingMsg->getData();
-            if(incomingMsg->isReturn()){
-                if(returnsHandler == 0){
-                    //print warning to log/console
-                    std::cout << "Message returned: " << msg.getData() << std::endl;
-                }else{
-                    returnsHandler->returned(msg);
+void Channel::run() {
+    while(isOpen()) {
+        try {
+            Message msg = incoming.waitDispatch();
+            if(msg.getMethod()->isA<BasicReturnBody>()) {
+                ReturnedMessageHandler* handler=0;
+                {
+                    Mutex::ScopedLock l(lock);
+                    handler=returnsHandler;
                 }
-            }else{
-                msg.deliveryTag = incomingMsg->getDeliveryTag();
-                std::string tag = incomingMsg->getConsumerTag();
-                
-                if(consumers.find(tag) == consumers.end())
-                    std::cout << "Unknown consumer: " << tag << std::endl;
-                else
-                    deliver(consumers[tag], msg);
+                if(handler == 0) {
+                    // TODO aconway 2007-02-20: proper logging.
+                    cout << "Message returned: " << msg.getData() << endl;
+                }
+                else 
+                    handler->returned(msg);
+            }
+            else {
+                BasicDeliverBody::shared_ptr deliverBody =
+                    boost::shared_polymorphic_downcast<BasicDeliverBody>(
+                        msg.getMethod());
+                std::string tag = deliverBody->getConsumerTag();
+                Consumer consumer;
+                {
+                    Mutex::ScopedLock l(lock);
+                    ConsumerMap::iterator i = consumers.find(tag);
+                    if(i == consumers.end()) 
+                        THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+                                         "Unknown consumer tag=" + tag);
+                    consumer = i->second;
+                }
+                deliver(consumer, msg);
             }
-            delete incomingMsg;
+        }
+        catch (const ShutdownException&) {
+            /* Orderly shutdown */
+        }
+        catch (const Exception& e) {
+            // FIXME aconway 2007-02-20: Report exception to user.
+            cout << "client::Channel::run() terminated by: " << e.toString()
+                 << "(" << typeid(e).name() << ")" << endl;
         }
     }
 }
 
 void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+    Mutex::ScopedLock l(lock);
     returnsHandler = handler;
 }
 
@@ -469,13 +424,17 @@
     u_int16_t code, const std::string& text,
     ClassId classId, MethodId methodId)
 {
-    if (getId() != 0 && isOpen()) {
+    if (isOpen()) {
         try {
-            sendAndReceive<ChannelCloseOkBody>(
-                new ChannelCloseBody(version, code, text, classId, methodId));
-            cancelAll();
+            if (getId() != 0) {
+                sendAndReceive<ChannelCloseOkBody>(
+                    new ChannelCloseBody(
+                        version, code, text, classId, methodId));
+            }
+            static_cast<ConnectionForChannel*>(connection)->erase(getId()); 
             closeInternal();
         } catch (...) {
+            static_cast<ConnectionForChannel*>(connection)->erase(getId()); 
             closeInternal();
             throw;
         }
@@ -491,14 +450,13 @@
 }
 
 void Channel::closeInternal() {
-    assert(isOpen());
+    if (isOpen());
     {
-        Monitor::ScopedLock l(dispatchMonitor);
-        static_cast<ConnectionForChannel*>(connection)->erase(getId()); 
+        cancelAll();
+        incoming.shutdown();
         connection = 0;
         // A 0 response means we are closed.
         responses.signalResponse(AMQMethodBody::shared_ptr());
-        dispatchMonitor.notify();
     }
     dispatcher.join();        
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Wed Feb 21 11:25:45 2007
@@ -89,19 +89,12 @@
         u_int64_t lastDeliveryTag;
     };
     typedef std::map<std::string, Consumer> ConsumerMap;
-    typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods;
     
-    static const std::string OK;
-        
+    sys::Mutex lock;
     Connection* connection;
     sys::Thread dispatcher;
-    IncomingMethods incomingMethods;
-    IncomingMessage* incoming;
+    IncomingMessage incoming;
     ResponseHandler responses;
-    std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
-    IncomingMessage* retrieved;//holds response to basic.get
-    sys::Monitor dispatchMonitor;
-    sys::Monitor retrievalMonitor;
     ConsumerMap consumers;
     ReturnedMessageHandler* returnsHandler;
 
@@ -109,10 +102,7 @@
     const bool transactional;
     framing::ProtocolVersion version;
 
-    void enqueue();
     void retrieve(Message& msg);
-    IncomingMessage* dequeue();
-    void dispatch();
     void deliver(Consumer& consumer, Message& msg);
         
     void handleHeader(framing::AMQHeaderBody::shared_ptr body);
@@ -307,7 +297,8 @@
      * receive this message on publication, the message will be
      * returned (see setReturnedMessageHandler()).
      */
-    void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, 
+    void publish(const Message& msg, const Exchange& exchange,
+                 const std::string& routingKey, 
                  bool mandatory = false, bool immediate = false);
 
     /**
@@ -352,8 +343,8 @@
      * Closing a channel that is not open has no effect.
      */
     void close(
-        framing::ReplyCode = 200, const std::string& =OK,
-        framing::ClassId = 0, framing::MethodId  = 0);
+        framing::ReplyCode = 200, const std::string& ="OK",
+         framing::ClassId = 0, framing::MethodId  = 0);
 
     /**
      * Set a handler for this channel that will process any

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp Wed Feb 21 11:25:45 2007
@@ -19,7 +19,6 @@
  *
  */
 #include <ClientMessage.h>
-
 using namespace qpid::client;
 using namespace qpid::framing;
 
@@ -40,63 +39,63 @@
 Message::~Message(){
 }
 	
-BasicHeaderProperties* Message::getHeaderProperties(){
+BasicHeaderProperties* Message::getHeaderProperties() const {
     return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
 }
 
-const std::string& Message::getContentType(){ 
+const std::string& Message::getContentType() const { 
     return getHeaderProperties()->getContentType(); 
 }
 
-const std::string& Message::getContentEncoding(){ 
+const std::string& Message::getContentEncoding() const { 
     return getHeaderProperties()->getContentEncoding(); 
 }
 
-FieldTable& Message::getHeaders(){ 
+FieldTable& Message::getHeaders() const { 
     return getHeaderProperties()->getHeaders(); 
 }
 
-u_int8_t Message::getDeliveryMode(){ 
+u_int8_t Message::getDeliveryMode() const { 
     return getHeaderProperties()->getDeliveryMode(); 
 }
 
-u_int8_t Message::getPriority(){ 
+u_int8_t Message::getPriority() const { 
     return getHeaderProperties()->getPriority(); 
 }
 
-const std::string& Message::getCorrelationId(){
+const std::string& Message::getCorrelationId() const {
     return getHeaderProperties()->getCorrelationId(); 
 }
 
-const std::string& Message::getReplyTo(){ 
+const std::string& Message::getReplyTo() const { 
     return getHeaderProperties()->getReplyTo(); 
 }
 
-const std::string& Message::getExpiration(){ 
+const std::string& Message::getExpiration() const { 
     return getHeaderProperties()->getExpiration(); 
 }
 
-const std::string& Message::getMessageId(){
+const std::string& Message::getMessageId() const {
     return getHeaderProperties()->getMessageId(); 
 }
 
-u_int64_t Message::getTimestamp(){ 
+u_int64_t Message::getTimestamp() const { 
     return getHeaderProperties()->getTimestamp(); 
 }
 
-const std::string& Message::getType(){ 
+const std::string& Message::getType() const { 
     return getHeaderProperties()->getType(); 
 }
 
-const std::string& Message::getUserId(){ 
+const std::string& Message::getUserId() const { 
     return getHeaderProperties()->getUserId(); 
 }
 
-const std::string& Message::getAppId(){ 
+const std::string& Message::getAppId() const { 
     return getHeaderProperties()->getAppId(); 
 }
 
-const std::string& Message::getClusterId(){ 
+const std::string& Message::getClusterId() const { 
     return getHeaderProperties()->getClusterId(); 
 }
 
@@ -154,4 +153,10 @@
 
 void Message::setClusterId(const std::string& clusterId){ 
     getHeaderProperties()->setClusterId(clusterId); 
+}
+
+
+u_int64_t Message::getDeliveryTag() const {
+    BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get());
+    return deliver ? deliver->getDeliveryTag() : 0;
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h Wed Feb 21 11:25:45 2007
@@ -25,89 +25,99 @@
 #include <framing/amqp_framing.h>
 
 namespace qpid {
+
 namespace client {
+class IncomingMessage;
+
+/**
+ * A representation of messages for sent or recived through the
+ * client api.
+ *
+ * \ingroup clientapi
+ */
+class Message {
+    framing::AMQMethodBody::shared_ptr method;
+    framing::AMQHeaderBody::shared_ptr header;
+    std::string data;
+    bool redelivered;
+
+    // FIXME aconway 2007-02-20: const incorrect, needs const return type.
+    framing::BasicHeaderProperties* getHeaderProperties() const;
+    Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+        
+  public:
+    Message(const std::string& data=std::string());
+    ~Message();
+	
+    /**
+     * Allows the application to access the content of messages
+     * received.
+     * 
+     * @return a string representing the data of the message
+     */
+    std::string getData() const { return data; }
+
+    /**
+     * Allows the application to set the content of messages to be
+     * sent.
+     * 
+     * @param data a string representing the data of the message
+     */
+    void setData(const std::string& _data);
 
     /**
-     * A representation of messages for sent or recived through the
-     * client api.
-     *
-     * \ingroup clientapi
+     * @return true if this message was delivered previously (to
+     * any consumer) but was not acknowledged.
      */
-    class Message{
-	qpid::framing::AMQHeaderBody::shared_ptr header;
-        std::string data;
-	bool redelivered;
-        u_int64_t deliveryTag;
+    bool isRedelivered(){ return redelivered; }
+    void setRedelivered(bool _redelivered){  redelivered = _redelivered; }
 
-        qpid::framing::BasicHeaderProperties* getHeaderProperties();
-	Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+    u_int64_t getDeliveryTag() const;
+
+    const std::string& getContentType() const;
+    const std::string& getContentEncoding() const;
+    qpid::framing::FieldTable& getHeaders() const;
+    u_int8_t getDeliveryMode() const;
+    u_int8_t getPriority() const;
+    const std::string& getCorrelationId() const;
+    const std::string& getReplyTo() const;
+    const std::string& getExpiration() const;
+    const std::string& getMessageId() const;
+    u_int64_t getTimestamp() const;
+    const std::string& getType() const;
+    const std::string& getUserId() const;
+    const std::string& getAppId() const;
+    const std::string& getClusterId() const;
+
+    void setContentType(const std::string& type);
+    void setContentEncoding(const std::string& encoding);
+    void setHeaders(const qpid::framing::FieldTable& headers);
+    /**
+     * Sets the delivery mode. 1 = non-durable, 2 = durable.
+     */
+    void setDeliveryMode(u_int8_t mode);
+    void setPriority(u_int8_t priority);
+    void setCorrelationId(const std::string& correlationId);
+    void setReplyTo(const std::string& replyTo);
+    void setExpiration(const std::string&  expiration);
+    void setMessageId(const std::string& messageId);
+    void setTimestamp(u_int64_t timestamp);
+    void setType(const std::string& type);
+    void setUserId(const std::string& userId);
+    void setAppId(const std::string& appId);
+    void setClusterId(const std::string& clusterId);
+
+    /** Get the method used to deliver this message */
+    boost::shared_ptr<framing::AMQMethodBody> getMethod() const
+    { return method; }
         
-    public:
-	Message(const std::string& data=std::string());
-	~Message();
-	
-        /**
-         * Allows the application to access the content of messages
-         * received.
-         * 
-         * @return a string representing the data of the message
-         */
-	std::string getData() const { return data; }
-
-        /**
-         * Allows the application to set the content of messages to be
-         * sent.
-         * 
-         * @param data a string representing the data of the message
-         */
-	void setData(const std::string& _data);
-
-        /**
-         * @return true if this message was delivered previously (to
-         * any consumer) but was not acknowledged.
-         */
-	inline bool isRedelivered(){ return redelivered; }
-	inline void setRedelivered(bool _redelivered){  redelivered = _redelivered; }
-
-        inline u_int64_t getDeliveryTag(){ return deliveryTag; }
-
-        const std::string& getContentType();
-        const std::string& getContentEncoding();
-        qpid::framing::FieldTable& getHeaders();
-        u_int8_t getDeliveryMode();
-        u_int8_t getPriority();
-        const std::string& getCorrelationId();
-        const std::string& getReplyTo();
-        const std::string& getExpiration();
-        const std::string& getMessageId();
-        u_int64_t getTimestamp();
-        const std::string& getType();
-        const std::string& getUserId();
-        const std::string& getAppId();
-        const std::string& getClusterId();
-
-	void setContentType(const std::string& type);
-	void setContentEncoding(const std::string& encoding);
-	void setHeaders(const qpid::framing::FieldTable& headers);
-        /**
-         * Sets the delivery mode. 1 = non-durable, 2 = durable.
-         */
-	void setDeliveryMode(u_int8_t mode);
-	void setPriority(u_int8_t priority);
-	void setCorrelationId(const std::string& correlationId);
-	void setReplyTo(const std::string& replyTo);
-	void setExpiration(const std::string&  expiration);
-	void setMessageId(const std::string& messageId);
-	void setTimestamp(u_int64_t timestamp);
-	void setType(const std::string& type);
-	void setUserId(const std::string& userId);
-	void setAppId(const std::string& appId);
-	void setClusterId(const std::string& clusterId);
-
-
-        // TODO aconway 2007-02-15: remove friendships.
-      friend class Channel;
-    };
+    void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; }
+    boost::shared_ptr<framing::AMQHeaderBody> getHeader();
+
+    // TODO aconway 2007-02-15: remove friendships.
+  friend class IncomingMessage;
+  friend class Channel;
+};
 
 }}
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Wed Feb 21 11:25:45 2007
@@ -18,7 +18,9 @@
  * under the License.
  *
  */
+#include <algorithm>
 #include <boost/format.hpp>
+#include <boost/bind.hpp>
 
 #include <Connection.h>
 #include <ClientChannel.h>
@@ -27,7 +29,6 @@
 #include <iostream>
 #include <sstream>
 #include <MethodBodyInstances.h>
-#include <boost/bind.hpp>
 #include <functional>
 
 using namespace qpid::framing;
@@ -83,15 +84,17 @@
 {
     if(isOpen) {
         // TODO aconway 2007-01-29: Exception handling - could end up
-        // partly closed.
+        // partly closed with threads left unjoined.
         isOpen = false;
         channel0.sendAndReceive<ConnectionCloseOkBody>(
             new ConnectionCloseBody(
                 getVersion(), code, msg, classId, methodId));
-        while(!channels.empty()) {
-            channels.begin()->second->close();
-            channels.erase(channels.begin());
-        }
+
+        using boost::bind;
+        for_each(channels.begin(), channels.end(),
+                 bind(&Channel::closeInternal,
+                             bind(&ChannelMap::value_type::second, _1)));
+        channels.clear();
         connector->close();
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp Wed Feb 21 11:25:45 2007
@@ -19,58 +19,154 @@
  *
  */
 #include <IncomingMessage.h>
+#include "framing/AMQHeaderBody.h"
+#include "framing/AMQContentBody.h"
+#include "BasicGetOkBody.h"
+#include "BasicReturnBody.h"
+#include "BasicDeliverBody.h"
 #include <QpidError.h>
 #include <iostream>
 
-using namespace qpid::client;
-using namespace qpid::framing;
+namespace qpid {
+namespace client {
 
-IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
-IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
-IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
+using namespace sys;
+using namespace framing;
 
-IncomingMessage::~IncomingMessage(){
-}
-
-void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){
-    this->header = _header;
-}
-
-void IncomingMessage::addContent(AMQContentBody::shared_ptr content){
-    data.append(content->getData());
-}
-
-bool IncomingMessage::isComplete(){
-    return header != 0 && header->getContentSize() == data.size();
-}
-
-bool IncomingMessage::isReturn(){
-    return returned;
-}
-
-bool IncomingMessage::isDelivery(){
-    return delivered;
-}
-
-bool IncomingMessage::isResponse(){
-    return response;
-}
-
-const string& IncomingMessage::getConsumerTag(){
-    if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
-    return delivered->getConsumerTag();
-}
-
-u_int64_t IncomingMessage::getDeliveryTag(){
-    if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
-    return delivered->getDeliveryTag();
-}
-
-AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
-    return header;
-}
-
-std::string IncomingMessage::getData() const {
-    return data;
+struct IncomingMessage::Guard: public Mutex::ScopedLock {
+    Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) {
+        im->shutdownError.throwIf();
+    }
+};
+
+IncomingMessage::IncomingMessage() { reset(); }
+
+void IncomingMessage::reset() {
+    state = &IncomingMessage::expectRequest;
+    endFn= &IncomingMessage::endRequest;
+    buildMessage = Message();
+}
+    
+void IncomingMessage::startGet() {
+    Guard g(this);
+    if (state != &IncomingMessage::expectRequest) {
+        endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress."));
+    }
+    else {
+        state = &IncomingMessage::expectGetOk;
+        endFn = &IncomingMessage::endGet;
+        getError.reset();
+        getState = GETTING;
+    }
+}
+
+bool IncomingMessage::waitGet(Message& msg) {
+    Guard g(this);
+    while (getState == GETTING && !shutdownError && !getError)
+        getReady.wait(lock);
+    shutdownError.throwIf();
+    getError.throwIf();
+    msg = getMessage;
+    return getState==GOT;
+}
+
+Message IncomingMessage::waitDispatch() {
+    Guard g(this);
+    while(dispatchQueue.empty() && !shutdownError)
+        dispatchReady.wait(lock);
+    shutdownError.throwIf();
+
+    Message msg(dispatchQueue.front());
+    dispatchQueue.pop();
+    return msg;
+}
+
+void IncomingMessage::add(BodyPtr body) {
+    Guard g(this);
+    shutdownError.throwIf();
+    // Call the current state function.
+    (this->*state)(body);
+}
+
+void IncomingMessage::shutdown() {
+    Mutex::ScopedLock l(lock);
+    shutdownError.reset(new ShutdownException());
+    getReady.notify();
+    dispatchReady.notify();
+}
+
+bool IncomingMessage::isShutdown() const {
+    Mutex::ScopedLock l(lock);
+    return shutdownError;
+}
+
+// Common check for all the expect functions. Called in network thread.
+template<class T>
+boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) {
+    boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body);
+    if (!ptr) 
+        throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
+    return ptr;
+}
+
+void IncomingMessage::expectGetOk(BodyPtr body) {
+    if (dynamic_cast<BasicGetOkBody*>(body.get()))
+        state = &IncomingMessage::expectHeader;
+    else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) {
+        getState = EMPTY;
+        endGet();
+    }
+    else
+        throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
+}
+
+void IncomingMessage::expectHeader(BodyPtr body) {
+    AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body);
+    buildMessage.header = header;
+    state = &IncomingMessage::expectContent;
+    checkComplete();
+}
+
+void IncomingMessage::expectContent(BodyPtr body) {
+    AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body);
+    buildMessage.setData(buildMessage.getData() + content->getData());
+    checkComplete();
+}
+
+void IncomingMessage::checkComplete() {
+    size_t declaredSize = buildMessage.header->getContentSize();
+    size_t currentSize = buildMessage.getData().size();
+    if (declaredSize == currentSize)
+        (this->*endFn)(0);
+    else if (declaredSize < currentSize)
+        (this->*endFn)(new QPID_ERROR(
+                  PROTOCOL_ERROR, "Message content exceeds declared size."));
+}
+
+void IncomingMessage::expectRequest(BodyPtr body) {
+    AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body);
+    buildMessage.setMethod(method);
+    state = &IncomingMessage::expectHeader;
+}
+    
+void IncomingMessage::endGet(Exception* ex) {
+    getError.reset(ex);
+    if (getState == GETTING) {
+        getMessage = buildMessage;
+        getState = GOT;
+    }
+    reset();
+    getReady.notify();
+}
+
+void IncomingMessage::endRequest(Exception* ex) {
+    ExceptionHolder eh(ex);
+    if (!eh) {
+        dispatchQueue.push(buildMessage);
+        reset();
+        dispatchReady.notify();
+    }
+    eh.throwIf();
 }
 
+}} // namespace qpid::client

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h Wed Feb 21 11:25:45 2007
@@ -1,3 +1,6 @@
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,43 +22,97 @@
  *
  */
 #include <string>
-#include <vector>
+#include <queue>
 #include <framing/amqp_framing.h>
+#include "ExceptionHolder.h"
+#include "ClientMessage.h"
+#include "sys/Mutex.h"
+#include "sys/Condition.h"
 
-#ifndef _IncomingMessage_
-#define _IncomingMessage_
+namespace qpid {
 
-#include <ClientMessage.h>
+namespace framing {
+class AMQBody;
+}
 
-namespace qpid {
 namespace client {
+/**
+ * Accumulates incoming message frames into messages.
+ * Client-initiated messages (basic.get) are initiated and made
+ * available to the user thread one at a time.
+ * 
+ * Broker initiated messages (basic.return, basic.deliver) are
+ * queued for handling by the user dispatch thread.
+ */
+class IncomingMessage {
+  public:
+    typedef boost::shared_ptr<framing::AMQBody> BodyPtr;
+    IncomingMessage();
+    
+    /** Expect a new message starting with getOk. Called in user thread.*/
+    void startGet();
 
-    class IncomingMessage{
-        //content will be preceded by one of these method frames
-	qpid::framing::BasicDeliverBody::shared_ptr delivered;
-	qpid::framing::BasicReturnBody::shared_ptr returned;
-	qpid::framing::BasicGetOkBody::shared_ptr response;
-	qpid::framing::AMQHeaderBody::shared_ptr header;
-	std::string data;
-    public:
-	IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
-	IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
-	IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
-        ~IncomingMessage();
-	void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
-	void addContent(qpid::framing::AMQContentBody::shared_ptr content);
-	bool isComplete();
-	bool isReturn();
-	bool isDelivery();
-	bool isResponse();
-	const std::string& getConsumerTag();//only relevant if isDelivery()
-	qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
-        u_int64_t getDeliveryTag();
-	std::string getData() const;
-    };
+    /** Wait for the message to complete, return the message.
+     * Called in user thread. 
+     *@raises QpidError if there was an error.
+     */
+    bool waitGet(Message&);
 
-}
-}
+    /** Wait for the next broker-initiated message. */
+    Message waitDispatch();
+
+    /** Add a frame body to the message. Called in network thread. */
+    void add(BodyPtr);
+
+    /** Shut down: all further calls to any function throw ex. */
+    void shutdown();
+
+    /** Check if shutdown */
+    bool isShutdown() const;
+
+  private:
+
+    typedef void (IncomingMessage::* ExpectFn)(BodyPtr);
+    typedef void (IncomingMessage::* EndFn)(Exception*);
+    typedef std::queue<Message> MessageQueue;
+    struct Guard;
+  friend struct Guard;
+
+    void reset();
+    template <class T> boost::shared_ptr<T> expectCheck(BodyPtr);
+
+    // State functions - a state machine where each state is
+    // a member function that processes a frame body.
+    void expectGetOk(BodyPtr);
+    void expectHeader(BodyPtr);
+    void expectContent(BodyPtr);
+    void expectRequest(BodyPtr);
+
+    // End functions.
+    void endGet(Exception* ex = 0);
+    void endRequest(Exception* ex);
+
+    // Check for complete message.
+    void checkComplete();
+    
+    mutable sys::Mutex lock;
+    ExpectFn state;
+    EndFn endFn;
+    Message buildMessage;
+    ExceptionHolder shutdownError;
+
+    // For basic.get messages.
+    sys::Condition getReady;
+    ExceptionHolder getError;
+    Message getMessage;
+    enum { GETTING, GOT, EMPTY } getState;
+
+    // For broker-initiated messages
+    sys::Condition dispatchReady;
+    MessageQueue dispatchQueue;
+};
+
+}}
 
 
 #endif

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp Wed Feb 21 11:25:45 2007
@@ -59,16 +59,20 @@
     Monitor::ScopedLock l(monitor);
     while (waiting)
 	monitor.wait();
-    if (!response) {
-        THROW_QPID_ERROR(
-            PROTOCOL_ERROR, "Channel closed unexpectedly.");
-    }
-     if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+    getResponse(); // Check for closed.
+    if(!validate(response->amqpClassId(), response->amqpMethodId())) {
 	THROW_QPID_ERROR(
             PROTOCOL_ERROR,
             boost::format("Expected class:method %d:%d, got %d:%d")
             % c % m % response->amqpClassId() % response->amqpMethodId());
     }
+}
+
+framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() {
+    if (!response) 
+        THROW_QPID_ERROR(
+            PROTOCOL_ERROR, "Channel closed unexpectedly.");
+    return response;
 }
 
 RequestId ResponseHandler::getRequestId() {

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h Wed Feb 21 11:25:45 2007
@@ -42,7 +42,7 @@
     ~ResponseHandler();
     
     bool isWaiting(){ return waiting; }
-    framing::AMQMethodBody::shared_ptr getResponse(){ return response;}
+    framing::AMQMethodBody::shared_ptr getResponse();
     void waitForResponse();
     
     void signalResponse(framing::AMQMethodBody::shared_ptr response);

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp Wed Feb 21 11:25:45 2007
@@ -39,4 +39,8 @@
 
 void Exception::throwSelf() const  { throw *this; }
 
+ShutdownException::ShutdownException() : Exception("Shut down.") {}
+
+EmptyException::EmptyException() : Exception("Empty.") {}
+
 } // namespace qpid

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h Wed Feb 21 11:25:45 2007
@@ -66,6 +66,7 @@
     template <class T>
     ChannelException(framing::ReplyCode code_, const T& message)
         : Exception(message), code(code_) {}
+    void throwSelf() const { throw *this; }
 };
 
 struct ConnectionException : public Exception {
@@ -73,8 +74,23 @@
     template <class T>
     ConnectionException(framing::ReplyCode code_, const T& message)
         : Exception(message), code(code_) {}
+    void throwSelf() const { throw *this; }
 };
 
+/**
+ * Exception used to indicate that a thread should shut down.
+ * Does not indicate an error that should be signalled to the user.
+ */
+struct ShutdownException : public Exception {
+    ShutdownException();
+    void throwSelf() const { throw *this; }
+};
+
+/** Exception to indicate empty queue or other empty state */
+struct EmptyException : public Exception {
+    EmptyException();
+    void throwSelf() const { throw *this; }
+};
 
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h Wed Feb 21 11:25:45 2007
@@ -19,11 +19,15 @@
  *
  */
 
+#include <assert.h>
 #include <Exception.h>
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
 
+// FIXME aconway 2007-02-20:  Not necessary, a simple
+// Exception::shared_ptr will do the job. Remove
+// 
 /**
  * Holder for a heap-allocated exc eption that can be stack allocated
  * and thrown safely.
@@ -49,11 +53,11 @@
 
     ~ExceptionHolder() throw() {}
 
-    const char* what() const throw() { return (*this)->what(); }
-    std::string toString() const throw() { return (*this)->toString(); }
-    virtual Exception* clone() const throw() { return (*this)->clone(); }
-    virtual void throwSelf() const { (*this)->throwSelf(); }
-    virtual void throwIf() const { if (*this) (*this)->throwSelf(); }
+    const char* what() const throw() { return get()->what(); }
+    std::string toString() const throw() { return get()->toString(); }
+    Exception* clone() const throw() { return get()->clone(); }
+    void throwIf() const { if (get()) get()->throwSelf(); }
+    void throwSelf() const { assert(get()); get()->throwSelf(); }
 };
 
 } // namespace qpid

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h Wed Feb 21 11:25:45 2007
@@ -34,6 +34,8 @@
 
 class MethodContext;
 
+// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel.
+
 /**
  * Base class for client and broker channels.
  *

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h?view=auto&rev=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h Wed Feb 21 11:25:45 2007
@@ -0,0 +1,128 @@
+#ifndef _sys_Condition_h
+#define _sys_Condition_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/errno.h>
+#include <boost/noncopyable.hpp>
+#include <sys/Mutex.h>
+#include <sys/Time.h>
+
+#ifdef USE_APR
+#  include <apr_thread_cond.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A condition variable for thread synchronization.
+ */
+class Condition
+{
+  public:
+    inline Condition();
+    inline ~Condition();
+    inline void wait(Mutex&);
+    inline bool wait(Mutex&, const Time& absoluteTime);
+    inline void notify();
+    inline void notifyAll();
+
+  private:
+#ifdef USE_APR
+    apr_thread_cond_t* condition;
+#else
+    pthread_cond_t condition;
+#endif
+};
+
+
+// APR ================================================================
+#ifdef USE_APR
+
+Condition::Condition() {
+    CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+}
+
+Condition::~Condition() {
+    CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+void Condition::wait(Mutex& mutex) {
+    CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex.mutex));
+}
+
+bool Condition::wait(Mutex& mutex, const Time& absoluteTime){
+    // APR uses microseconds.
+    apr_status_t status =
+        apr_thread_cond_timedwait(
+            condition, mutex.mutex, absoluteTime/TIME_USEC);
+    if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
+    return status == 0;
+}
+
+void Condition::notify(){
+    CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void Condition::notifyAll(){
+    CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+#else
+// POSIX ================================================================
+
+Condition::Condition() {
+    QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
+}
+
+Condition::~Condition() {
+    QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
+}
+
+void Condition::wait(Mutex& mutex) {
+    QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex));
+}
+
+bool Condition::wait(Mutex& mutex, const Time& absoluteTime){
+    struct timespec ts;
+    toTimespec(ts, absoluteTime);
+    int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts);
+    if (status != 0) {
+        if (status == ETIMEDOUT) return false;
+        throw QPID_POSIX_ERROR(status);
+    }
+    return true;
+}
+
+void Condition::notify(){
+    QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+}
+
+void Condition::notifyAll(){
+    QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
+}
+#endif  /*USE_APR*/
+
+
+}}
+#endif  /*!_sys_Condition_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h Wed Feb 21 11:25:45 2007
@@ -23,9 +23,7 @@
  */
 
 #include <sys/errno.h>
-#include <boost/noncopyable.hpp>
-#include <sys/Mutex.h>
-#include <sys/Time.h>
+#include <sys/Condition.h>
 
 #ifdef USE_APR
 #  include <apr_thread_cond.h>
@@ -37,91 +35,21 @@
 /**
  * A monitor is a condition variable and a mutex
  */
-class Monitor : public Mutex
-{
+class Monitor : public Mutex, public Condition {
   public:
-    inline Monitor();
-    inline ~Monitor();
+    using Condition::wait;
     inline void wait();
     inline bool wait(const Time& absoluteTime);
-    inline void notify();
-    inline void notifyAll();
-
-  private:
-#ifdef USE_APR
-    apr_thread_cond_t* condition;
-#else
-    pthread_cond_t condition;
-#endif
 };
 
 
-// APR ================================================================
-#ifdef USE_APR
-
-Monitor::Monitor() {
-    CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
-}
-
-Monitor::~Monitor() {
-    CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
-}
-
-void Monitor::wait() {
-    CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
-}
-
-bool Monitor::wait(const Time& absoluteTime){
-    // APR uses microseconds.
-    apr_status_t status =
-        apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC);
-    if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
-    return status == 0;
-}
-
-void Monitor::notify(){
-    CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
-}
-
-void Monitor::notifyAll(){
-    CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
-}
-
-#else
-// POSIX ================================================================
-
-Monitor::Monitor() {
-    QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
-}
-
-Monitor::~Monitor() {
-    QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
-}
-
 void Monitor::wait() {
-    QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex));
-}
-
-bool Monitor::wait(const Time& absoluteTime){
-    struct timespec ts;
-    toTimespec(ts, absoluteTime);
-    int status = pthread_cond_timedwait(&condition, &mutex, &ts);
-    if (status != 0) {
-        if (status == ETIMEDOUT) return false;
-        throw QPID_POSIX_ERROR(status);
-    }
-    return true;
+    Condition::wait(*this);
 }
 
-void Monitor::notify(){
-    QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+bool Monitor::wait(const Time& absoluteTime) {
+    return Condition::wait(*this, absoluteTime);
 }
-
-void Monitor::notifyAll(){
-    QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
-}
-#endif  /*USE_APR*/
-
 
 }}
 #endif  /*!_sys_Monitor_h*/

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h Wed Feb 21 11:25:45 2007
@@ -32,6 +32,8 @@
 namespace qpid {
 namespace sys {
 
+class Condition;
+
 /**
  * Scoped lock template: calls lock() in ctor, unlock() in dtor.
  * L can be any class with lock() and unlock() functions.
@@ -76,6 +78,7 @@
 #else
     pthread_mutex_t mutex;
 #endif
+  friend class Condition;
 };
 
 #ifdef USE_APR

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp Wed Feb 21 11:25:45 2007
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include <vector>
 #include "qpid_test_plugin.h"
 #include "InProcessBroker.h"
 #include "ClientChannel.h"
@@ -28,6 +29,7 @@
 using namespace std;
 using namespace boost;
 using namespace qpid::client;
+using namespace qpid::sys;
 using namespace qpid::framing;
 
 /**
@@ -36,45 +38,125 @@
 class ClientChannelTest : public CppUnit::TestCase  
 {
     CPPUNIT_TEST_SUITE(ClientChannelTest);
-    CPPUNIT_TEST(testGet);
-    CPPUNIT_TEST(testConsume);
+    CPPUNIT_TEST(testPublishGet);
+    CPPUNIT_TEST(testGetNoContent);
+    CPPUNIT_TEST(testConsumeCancel);
+    CPPUNIT_TEST(testConsumePublished);
     CPPUNIT_TEST_SUITE_END();
 
+    struct Listener: public qpid::client::MessageListener {
+        vector<Message> messages;
+        Monitor monitor;
+        void received(Message& msg) {
+            Mutex::ScopedLock l(monitor);
+            messages.push_back(msg);
+            monitor.notifyAll();
+        }
+    };
+    
     InProcessBrokerClient connection; // client::connection + local broker
     Channel channel;
-    const std::string key;
+    const std::string qname;
     const std::string data;
     Queue queue;
     Exchange exchange;
+    Listener listener;
 
   public:
 
     ClientChannelTest()
-        : key("testq"), data("hello"),
-          queue(key, true), exchange("", Exchange::DIRECT_EXCHANGE)
+        : qname("testq"), data("hello"),
+          queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
     {
         connection.openChannel(channel);
         CPPUNIT_ASSERT(channel.getId() != 0);
         channel.declareQueue(queue);
     }
 
-    void testGet() {
-        // FIXME aconway 2007-02-16: Must fix thread safety bug
-        // in ClientChannel::get for this to pass.
-        return;
-
+    void testPublishGet() {
         Message pubMsg(data);
-        channel.publish(pubMsg, exchange, key);
+        pubMsg.getHeaders().setString("hello", "world");
+        channel.publish(pubMsg, exchange, qname);
         Message getMsg;
-        channel.get(getMsg, queue);
+        CPPUNIT_ASSERT(channel.get(getMsg, queue));
         CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
+        CPPUNIT_ASSERT_EQUAL(string("world"),
+                             getMsg.getHeaders().getString("hello"));
+        CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
     }
 
-    void testConsume() {
+    void testGetNoContent() {
+        Message pubMsg;
+        pubMsg.getHeaders().setString("hello", "world");
+        channel.publish(pubMsg, exchange, qname);
+        Message getMsg;
+        CPPUNIT_ASSERT(channel.get(getMsg, queue));
+        CPPUNIT_ASSERT(getMsg.getData().empty());
+        CPPUNIT_ASSERT_EQUAL(string("world"),
+                             getMsg.getHeaders().getString("hello"));
     }
+
+    void testConsumeCancel() {
+        string tag;             // Broker assigned
+        channel.consume(queue, tag, &listener);
+        channel.start();
+        CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
+        channel.publish(Message("a"), exchange, qname);
+        {
+            Mutex::ScopedLock l(listener.monitor);
+            Time deadline(now() + 1*TIME_SEC);
+            while (listener.messages.size() != 1) {
+                CPPUNIT_ASSERT(listener.monitor.wait(deadline));
+            }
+        }
+        CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
+        CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
+            
+        channel.publish(Message("b"), exchange, qname);
+        channel.publish(Message("c"), exchange, qname);
+        {
+            Mutex::ScopedLock l(listener.monitor);
+            while (listener.messages.size() != 3) {
+                CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+            }
+        }
+        CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
+        CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
+        CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
     
+        channel.cancel(tag);
+        channel.publish(Message("d"), exchange, qname);
+        CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
+        {
+            Mutex::ScopedLock l(listener.monitor);
+            CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
+        }
+        Message msg;
+        CPPUNIT_ASSERT(channel.get(msg, queue));
+        CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
+    }
+
+    // Consume already-published messages
+    void testConsumePublished() {
+        Message pubMsg("x");
+        pubMsg.getHeaders().setString("y", "z");
+        channel.publish(pubMsg, exchange, qname);
+        string tag;
+        channel.consume(queue, tag, &listener);
+        CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
+        channel.start();
+        {
+            Mutex::ScopedLock l(listener.monitor);
+            while (listener.messages.size() != 1) 
+                CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+        }
+        CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData());
+        CPPUNIT_ASSERT_EQUAL(string("z"),
+                             listener.messages[0].getHeaders().getString("y"));
+    }
 
-    // FIXME aconway 2007-02-15: Cover full channel API
+    
+        
 };
 
 // Make this test suite a plugin.