You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/05 21:16:18 UTC

svn commit: r525932 - in /incubator/qpid/trunk/qpid/cpp/src: ./ client/ framing/ tests/

Author: aconway
Date: Thu Apr  5 12:16:09 2007
New Revision: 525932

URL: http://svn.apache.org/viewvc?view=rev&rev=525932
Log:
* Exteneded use of shared pointers frame bodies across all send() commands.
* tests/Makefile.am: added check-unit target to run just unit tests.

* Introduced make_shared_ptr convenience function for wrapping
  plain pointers with shared_ptr.

* cpp/src/client/ClientChannel.h,cpp (sendsendAndReceive,sendAndReceiveSync):
  Pass shared_ptr instead of raw ptr to fix memory problems. 
  Updated the following files to use make_shared_ptr
  - src/client/BasicMessageChannel.cpp
  - src/client/ClientConnection.cpp

* src/client/MessageMessageChannel.cpp: implemented 0-9 message.get.

* src/framing/Correlator.h,cpp: Allow request sender to register actions
  to take when the correlated response arrives.

* cpp/src/tests/FramingTest.cpp: Added Correlator tests.

* src/framing/ChannelAdapter.h,cpp: use Correlator to dispatch
  response actions.

* cpp/src/shared_ptr.h (make_shared_ptr):  Convenience function
  to make a shared pointer from a raw pointer.

* cpp/src/tests/ClientChannelTest.cpp: Added message.get test.


* cpp/src/tests/Makefile.am (check-unit): Added test-unit target
  to run unit tests.

Added:
    incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h
    incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h
    incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Apr  5 12:16:09 2007
@@ -92,10 +92,11 @@
   $(framing)/ProtocolVersionException.cpp	\
   $(framing)/Requester.cpp			\
   $(framing)/Responder.cpp			\
+  $(framing)/Correlator.cpp			\
   $(framing)/Value.cpp				\
   $(framing)/Proxy.cpp				\
   $(gen)/AMQP_ClientProxy.cpp			\
-  $(gen)/AMQP_HighestVersion.h		\
+  $(gen)/AMQP_HighestVersion.h			\
   $(gen)/AMQP_MethodVersionMap.cpp		\
   $(gen)/AMQP_ServerProxy.cpp			\
   Exception.cpp					\

Modified: incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp Thu Apr  5 12:16:09 2007
@@ -81,10 +81,10 @@
     BasicConsumeOkBody::shared_ptr ok =
         channel.sendAndReceiveSync<BasicConsumeOkBody>(
             synch,
-            new BasicConsumeBody(
+            make_shared_ptr(new BasicConsumeBody(
                 channel.version, 0, queue.getName(), tag, noLocal,
                 ackMode == NO_ACK, false, !synch,
-                fields ? *fields : FieldTable()));
+                fields ? *fields : FieldTable())));
     tag = ok->getConsumerTag();
 }
 
@@ -102,7 +102,7 @@
     if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
         channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
     channel.sendAndReceiveSync<BasicCancelOkBody>(
-        synch, new BasicCancelBody(channel.version, tag, !synch));
+        synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
 }
 
 void BasicMessageChannel::close(){
@@ -337,9 +337,9 @@
 
 void BasicMessageChannel::setQos(){
     channel.sendAndReceive<BasicQosOkBody>(
-        new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+        make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
     if(channel.isTransactional())
-        channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
+        channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version)));
 }
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp Thu Apr  5 12:16:09 2007
@@ -60,7 +60,7 @@
     init(id, con, con.getVersion()); // ChannelAdapter initialization.
     string oob;
     if (id != 0) 
-        sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
+        sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
 }
 
 void Channel::protocolInit(
@@ -77,10 +77,10 @@
     string locale("en_US");
     ConnectionTuneBody::shared_ptr proposal =
         sendAndReceive<ConnectionTuneBody>(
-            new ConnectionStartOkBody(
+            make_shared_ptr(new ConnectionStartOkBody(
                 version, connectionStart->getRequestId(),
                 props, mechanism,
-                response, locale));
+                response, locale)));
 
     /**
      * Assume for now that further challenges will not be required
@@ -136,15 +136,15 @@
     FieldTable args;
     sendAndReceiveSync<ExchangeDeclareOkBody>(
         synch,
-        new ExchangeDeclareBody(
-            version, 0, name, type, false, false, false, false, !synch, args));
+        make_shared_ptr(new ExchangeDeclareBody(
+                            version, 0, name, type, false, false, false, false, !synch, args)));
 }
 
 void Channel::deleteExchange(Exchange& exchange, bool synch){
     string name = exchange.getName();
     sendAndReceiveSync<ExchangeDeleteOkBody>(
         synch,
-        new ExchangeDeleteBody(version, 0, name, false, !synch));
+        make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch)));
 }
 
 void Channel::declareQueue(Queue& queue, bool synch){
@@ -153,9 +153,9 @@
     QueueDeclareOkBody::shared_ptr response =
         sendAndReceiveSync<QueueDeclareOkBody>(
             synch,
-            new QueueDeclareBody(
+            make_shared_ptr(new QueueDeclareBody(
                 version, 0, name, false/*passive*/, queue.isDurable(),
-                queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+                queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
     if(synch) {
         if(queue.getName().length() == 0)
             queue.setName(response->getQueue());
@@ -167,7 +167,7 @@
     string name = queue.getName();
     sendAndReceiveSync<QueueDeleteOkBody>(
         synch,
-        new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+        make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)));
 }
 
 void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
@@ -175,15 +175,15 @@
     string q = queue.getName();
     sendAndReceiveSync<QueueBindOkBody>(
         synch,
-        new QueueBindBody(version, 0, q, e, key,!synch, args));
+        make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args)));
 }
 
 void Channel::commit(){
-    sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
+    sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version)));
 }
 
 void Channel::rollback(){
-    sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version));
+    sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version)));
 }
 
 void Channel::handleMethodInContext(
@@ -203,7 +203,8 @@
     }
     try {
         switch (method->amqpClassId()) {
-          case BasicDeliverBody::CLASS_ID: messaging->handle(method); break;
+          case MessageOkBody::CLASS_ID: 
+          case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
           case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
           case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
           default: throw UnknownMethod();
@@ -261,8 +262,8 @@
         try {
             if (getId() != 0) {
                 sendAndReceive<ChannelCloseOkBody>(
-                    new ChannelCloseBody(
-                        version, code, text, classId, methodId));
+                    make_shared_ptr(new ChannelCloseBody(
+                                        version, code, text, classId, methodId)));
             }
             static_cast<ConnectionForChannel*>(connection)->erase(getId()); 
             closeInternal();
@@ -292,7 +293,7 @@
 }
 
 AMQMethodBody::shared_ptr Channel::sendAndReceive(
-    AMQMethodBody* toSend, ClassId c, MethodId m)
+    AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
 {
     responses.expect();
     send(toSend);
@@ -300,7 +301,7 @@
 }
 
 AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
-    bool sync, AMQMethodBody* body, ClassId c, MethodId m)
+    bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
 {
     if(sync)
         return sendAndReceive(body, c, m);

Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h Thu Apr  5 12:16:09 2007
@@ -56,6 +56,7 @@
 {
   private:
     struct UnknownMethod {};
+    typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
         
     sys::Mutex lock;
     boost::scoped_ptr<MessageChannel> messaging;
@@ -82,21 +83,23 @@
         const std::string& vhost);
     
     framing::AMQMethodBody::shared_ptr sendAndReceive(
-        framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
+        framing::AMQMethodBody::shared_ptr,
+        framing::ClassId, framing::MethodId);
 
     framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
         bool sync,
-        framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
+        framing::AMQMethodBody::shared_ptr,
+        framing::ClassId, framing::MethodId);
 
     template <class BodyType>
-    boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) {
+    boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
         return boost::shared_polymorphic_downcast<BodyType>(
             sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
     }
 
     template <class BodyType>
     boost::shared_ptr<BodyType> sendAndReceiveSync(
-        bool sync, framing::AMQMethodBody* body) {
+        bool sync, framing::AMQMethodBody::shared_ptr body) {
         return boost::shared_polymorphic_downcast<BodyType>(
             sendAndReceiveSync(
                 sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));

Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientConnection.cpp Thu Apr  5 12:16:09 2007
@@ -87,8 +87,8 @@
         // partly closed with threads left unjoined.
         isOpen = false;
         channel0.sendAndReceive<ConnectionCloseOkBody>(
-            new ConnectionCloseBody(
-                getVersion(), code, msg, classId, methodId));
+            make_shared_ptr(new ConnectionCloseBody(
+                                getVersion(), code, msg, classId, methodId)));
 
         using boost::bind;
         for_each(channels.begin(), channels.end(),

Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientMessage.h Thu Apr  5 12:16:09 2007
@@ -33,6 +33,8 @@
  *
  * \ingroup clientapi
  */
+// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
+// basic header properties.
 class Message : public framing::BasicHeaderProperties {
   public:
     Message(const std::string& data_=std::string()) : data(data_) {}

Modified: incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp Thu Apr  5 12:16:09 2007
@@ -25,6 +25,7 @@
 #include "../framing/FieldTable.h"
 #include "Connection.h"
 #include "../shared_ptr.h"
+#include <boost/bind.hpp>
 
 namespace qpid {
 namespace client {
@@ -48,9 +49,9 @@
     if (tag.empty())
         tag = newTag();
     channel.sendAndReceive<MessageOkBody>(
-        new MessageConsumeBody(
+        make_shared_ptr(new MessageConsumeBody(
             channel.getVersion(), 0, queue.getName(), tag, noLocal,
-            ackMode == NO_ACK, false, fields ? *fields : FieldTable()));
+            ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
     
 //     // FIXME aconway 2007-02-20: Race condition!
 //     // We could receive the first message for the consumer
@@ -115,16 +116,44 @@
  */
 const string getDestinationId("__get__");
 
+/**
+ * A destination that provides a Correlator::Action to handle
+ * MessageEmpty responses.
+ */
+struct MessageGetDestination : public IncomingMessage::WaitableDestination
+{
+    void response(shared_ptr<AMQResponseBody> response) {
+        if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
+            switch (response->amqpMethodId()) {
+              case MessageOkBody::METHOD_ID:
+                // Nothing to do, wait for transfer.
+                return;
+              case MessageEmptyBody::METHOD_ID:
+                empty();        // Wake up waiter with empty queue.
+                return;
+            }
+        }
+        throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
+    }
+
+    Correlator::Action action() {
+        return boost::bind(&MessageGetDestination::response, this, _1);
+    }
+};
+
 bool MessageMessageChannel::get(
-    Message& , const Queue& , AckMode )
+    Message& msg, const Queue& queue, AckMode ackMode)
 {
     Mutex::ScopedLock l(lock);
-//     incoming.addDestination(getDestinationId, getDest);
-//     channel.send(
-//         new MessageGetBody(
-//             channel.version, 0, queue.getName(), getDestinationId, ackMode));
-//    return getDest.wait(msg);
-    return false;
+    std::string destName=newTag();
+    MessageGetDestination dest;
+    incoming.addDestination(destName, dest);
+    channel.send(
+        make_shared_ptr(
+            new MessageGetBody(
+                channel.version, 0, queue.getName(), destName, ackMode)),
+        dest.action());
+    return dest.wait(msg);
 }
 
 
@@ -176,9 +205,30 @@
         // FIXME aconway 2007-02-23: 
         throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
     }
-    channel.sendAndReceive<MessageOkBody>(transfer.get());
+    channel.sendAndReceive<MessageOkBody>(transfer);
 }
         
+void copy(Message& msg, MessageTransferBody& transfer) {
+    // FIXME aconway 2007-04-05: Verify all required fields
+    // are copied.
+    msg.setContentType(transfer.getContentType());
+    msg.setContentEncoding(transfer.getContentEncoding());
+    msg.setHeaders(transfer.getApplicationHeaders());
+    msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
+    msg.setPriority(transfer.getPriority());
+    msg.setCorrelationId(transfer.getCorrelationId());
+    msg.setReplyTo(transfer.getReplyTo());
+    // FIXME aconway 2007-04-05: TTL/Expiration
+    msg.setMessageId(transfer.getMessageId());
+    msg.setTimestamp(transfer.getTimestamp());
+    msg.setUserId(transfer.getUserId());
+    msg.setAppId(transfer.getAppId());
+    msg.setDestination(transfer.getDestination());
+    msg.setRedelivered(transfer.getRedelivered());
+    msg.setDeliveryTag(0); // No meaning in 0-9
+    if (transfer.getBody().isInline()) 
+        msg.setData(transfer.getBody().getValue());
+}
 
 void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
     assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
@@ -203,23 +253,38 @@
           break;
       }
 
-      case MessageEmptyBody::METHOD_ID: {
-          // FIXME aconway 2007-04-04: 
-          // getDest.empty();
+      case MessageTransferBody::METHOD_ID: {
+          MessageTransferBody::shared_ptr transfer=
+              shared_polymorphic_downcast<MessageTransferBody>(method);
+          if (transfer->getBody().isInline()) {
+              Message msg;
+              copy(msg, *transfer);
+              // Deliver it.
+              incoming.getDestination(transfer->getDestination()).message(msg);
+          }
+          else {
+              Message& msg=incoming.createMessage(
+                  transfer->getDestination(), transfer->getBody().getValue());
+              copy(msg, *transfer);
+              // Will be delivered when reference closes.
+          }
           break;
       }
 
-      case MessageCancelBody::METHOD_ID:
-      case MessageCheckpointBody::METHOD_ID:
+      case MessageEmptyBody::METHOD_ID: 
+      case MessageOkBody::METHOD_ID:
+        // Nothing to do
+        break;
 
         // FIXME aconway 2007-04-03:  TODO
-      case MessageOkBody::METHOD_ID:
+      case MessageCancelBody::METHOD_ID:
+      case MessageCheckpointBody::METHOD_ID:
       case MessageOffsetBody::METHOD_ID:
       case MessageQosBody::METHOD_ID:
       case MessageRecoverBody::METHOD_ID:
       case MessageRejectBody::METHOD_ID:
       case MessageResumeBody::METHOD_ID:
-      case MessageTransferBody::METHOD_ID:
+        break;
       default:
         throw Channel::UnknownMethod();
     }
@@ -322,10 +387,10 @@
 
 void MessageMessageChannel::setQos(){
     channel.sendAndReceive<MessageOkBody>(
-        new MessageQosBody(channel.version, 0, channel.getPrefetch(), false));
+        make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
     if(channel.isTransactional())
         channel.sendAndReceive<TxSelectOkBody>(
-            new TxSelectBody(channel.version));
+            make_shared_ptr(new TxSelectBody(channel.version)));
 }
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.cpp Thu Apr  5 12:16:09 2007
@@ -35,15 +35,19 @@
     version = v;
 }
 
-RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
-    RequestId result = 0;
+RequestId ChannelAdapter::send(
+    shared_ptr<AMQBody> body, Correlator::Action action)
+{
+    RequestId requestId = 0;
     assertChannelOpen();
     switch (body->type()) {
       case REQUEST_BODY: {
           AMQRequestBody::shared_ptr request =
               boost::shared_polymorphic_downcast<AMQRequestBody>(body);
           requester.sending(request->getData());
-          result = request->getData().requestId;
+          requestId = request->getData().requestId;
+          if (!action.empty())
+              correlator.request(requestId, action);
           break;
       }
       case RESPONSE_BODY: {
@@ -52,9 +56,10 @@
           responder.sending(response->getData());
           break;
       }
+        // No action required for other body types.
     }
     out->send(new AMQFrame(getVersion(), getId(), body));
-    return result;
+    return requestId;
 }
 
 void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
@@ -66,10 +71,15 @@
 
 void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
     assertMethodOk(*response);
-    // TODO aconway 2007-01-30: Consider a response handled on receipt.
-    // Review - any cases where this is not the case?
     AMQResponseBody::Data& responseData = response->getData();
+
+    // FIXME aconway 2007-04-05: processed should be last
+    // but causes problems with InProcessBroker tests because
+    // we execute client code in handleMethod.
+    // Need to introduce a queue & 2 threads for inprocess.
     requester.processed(responseData);
+    // FIXME aconway 2007-04-04: exception handling.
+    correlator.response(response);
     handleMethod(response);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/ChannelAdapter.h Thu Apr  5 12:16:09 2007
@@ -22,11 +22,11 @@
  *
  */
 
-#include <boost/shared_ptr.hpp>
-
+#include "../shared_ptr.h"
 #include "BodyHandler.h"
 #include "Requester.h"
 #include "Responder.h"
+#include "Correlator.h"
 #include "amqp_types.h"
 
 namespace qpid {
@@ -64,17 +64,24 @@
 
     ChannelId getId() const { return id; }
     ProtocolVersion getVersion() const { return version; }
-    
+
     /**
-     * Wrap body in a frame and send the frame.
-     * Takes ownership of body.
+     * Send a frame.
+     *@param body Body of the frame.
+     *@param action optional action to execute when we receive a
+     *response to this frame.  Ignored if body is not a Request.
+     *@return If body is a request, the ID assigned else 0.
      */
-    RequestId send(AMQBody::shared_ptr body);
+    RequestId send(shared_ptr<AMQBody> body,
+                   Correlator::Action action=Correlator::Action());
+
+    // TODO aconway 2007-04-05:  remove and use make_shared_ptr at call sites.
+    /**@deprecated Use make_shared_ptr with the other send() override */
     RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
 
-    void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
-    void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
-    void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>);
+    void handleMethod(shared_ptr<AMQMethodBody>);
+    void handleRequest(shared_ptr<AMQRequestBody>);
+    void handleResponse(shared_ptr<AMQResponseBody>);
 
     virtual bool isOpen() const = 0;
     
@@ -84,7 +91,7 @@
     void assertChannelNotOpen() const;
 
     virtual void handleMethodInContext(
-        boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+        shared_ptr<AMQMethodBody> method,
         const MethodContext& context) = 0;
 
     RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
@@ -97,6 +104,7 @@
     ProtocolVersion version;
     Requester requester;
     Responder responder;
+    Correlator correlator;
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp?view=auto&rev=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp Thu Apr  5 12:16:09 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Correlator.h"
+
+namespace qpid {
+namespace framing {
+
+void Correlator::request(RequestId id, Action action) {
+    actions[id] = action;
+}
+
+bool Correlator::response(shared_ptr<AMQResponseBody> r) {
+    Actions::iterator begin = actions.lower_bound(r->getRequestId());
+    Actions::iterator end =
+        actions.upper_bound(r->getRequestId()+r->getBatchOffset());
+    bool didAction = false;
+    for(Actions::iterator i=begin; i != end; ++i) {
+        // FIXME aconway 2007-04-04: Exception handling.
+        didAction = true;
+        i->second(r);
+        actions.erase(i);
+    }
+    return didAction;
+}
+
+}} // namespace qpid::framing

Propchange: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h?view=auto&rev=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h Thu Apr  5 12:16:09 2007
@@ -0,0 +1,68 @@
+#ifndef _framing_Correlator_h
+#define _framing_Correlator_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "../shared_ptr.h"
+#include "../framing/AMQResponseBody.h"
+#include <boost/function.hpp>
+#include <map>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Correlate responses with actions established when sending the request.
+ *
+ * THREAD  UNSAFE.
+ */
+class Correlator
+{
+  public:
+    typedef shared_ptr<AMQResponseBody> ResponsePtr;
+    typedef boost::function<void (ResponsePtr)> Action;
+    
+    /**
+     * Note that request with id was sent, record an action to call
+     * when a response arrives.
+     */
+    void request(RequestId id, Action doOnResponse);
+
+    /**
+     * Note response received, call action for associated request if any.
+     * Return true of some action(s) were executed.
+     */
+    bool response(shared_ptr<AMQResponseBody>);
+
+    /**
+     * Note the given execution mark was received, call actions
+     * for any requests that are impicitly responded to.
+     */
+    void mark(RequestId mark);
+
+  private:
+    typedef std::map<RequestId, Action> Actions;
+    Actions actions;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif  /*!_framing_Correlator_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/framing/Correlator.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/framing/Requester.h Thu Apr  5 12:16:09 2007
@@ -32,8 +32,7 @@
 /**
  * Manage request IDs and the response mark for locally initiated requests.
  *
- * THREAD UNSAFE: This class is called as frames are sent or received
- * sequentially on a connection, so it does not need to be thread safe.
+ * THREAD UNSAFE: must be locked externally.
  */
 class Requester
 {
@@ -46,12 +45,14 @@
     /** Called after processing a response. */
     void processed(const AMQResponseBody::Data&);
 
-	/** Get the next request id to be used. */
-	RequestId getNextId() { return lastId + 1; }
-	/** Get the first request acked by this response */
-	RequestId getFirstAckRequest() { return firstAckRequest; }
-	/** Get the last request acked by this response */
-	RequestId getLastAckRequest() { return lastAckRequest; }
+    /** Get the next request id to be used. */
+    RequestId getNextId() { return lastId + 1; }
+
+    /** Get the first request acked by last response */
+    RequestId getFirstAckRequest() { return firstAckRequest; }
+
+    /** Get the last request acked by last response */
+    RequestId getLastAckRequest() { return lastAckRequest; }
 
   private:
     RequestId lastId;

Modified: incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/shared_ptr.h Thu Apr  5 12:16:09 2007
@@ -23,12 +23,20 @@
 #include <boost/cast.hpp>
 
 namespace qpid {
-/// Import shared_ptr definitions into qpid namespace.
+
+// Import shared_ptr definitions into qpid namespace and define some
+// useful shared_ptr templates for convenience.
+
 using boost::shared_ptr;
 using boost::dynamic_pointer_cast;
 using boost::static_pointer_cast;
 using boost::const_pointer_cast;
 using boost::shared_polymorphic_downcast;
+
+template <class T> shared_ptr<T> make_shared_ptr(T* ptr) {
+    return shared_ptr<T>(ptr);
+}
+
 } // namespace qpid
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Thu Apr  5 12:16:09 2007
@@ -27,6 +27,7 @@
 #include "../client/ClientExchange.h"
 #include "../client/MessageListener.h"
 #include "../client/BasicMessageChannel.h"
+#include "../client/MessageMessageChannel.h"
 
 using namespace std;
 using namespace boost;
@@ -203,7 +204,17 @@
     }
 };
 
+class MessageClientChannelTest : public ClientChannelTestBase {
+    CPPUNIT_TEST_SUITE(MessageClientChannelTest);
+    CPPUNIT_TEST(testPublishGet);
+    CPPUNIT_TEST_SUITE_END();
+  public:
+    MessageClientChannelTest() {
+        channel.reset(new Channel(false, 500, Channel::AMQP_09));
+    }
+};
 
 // Make this test suite a plugin.
 CPPUNIT_PLUGIN_IMPLEMENT();
 CPPUNIT_TEST_SUITE_REGISTRATION(BasicClientChannelTest);
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageClientChannelTest);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp Thu Apr  5 12:16:09 2007
@@ -18,9 +18,6 @@
  * under the License.
  *
  */
-#include <memory>
-#include <boost/lexical_cast.hpp>
-
 #include "ConnectionRedirectBody.h"
 #include "../framing/ProtocolVersion.h"
 #include "../framing/amqp_framing.h"
@@ -38,6 +35,11 @@
 #include "../client/Connection.h"
 #include "../client/ClientExchange.h"
 #include "../client/ClientQueue.h"
+#include "../framing/Correlator.h"
+#include "BasicGetOkBody.h"
+#include <memory>
+#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
 
 using namespace qpid;
 using namespace qpid::framing;
@@ -65,6 +67,7 @@
     CPPUNIT_TEST(testResponseBodyFrame);
     CPPUNIT_TEST(testRequester);
     CPPUNIT_TEST(testResponder);
+    CPPUNIT_TEST(testCorrelator);
     CPPUNIT_TEST(testInlineContent);
     CPPUNIT_TEST(testContentReference);
     CPPUNIT_TEST(testContentValidation);
@@ -300,7 +303,7 @@
         Responder r;
         AMQRequestBody::Data q;
         AMQResponseBody::Data p;
-
+        
         q.requestId = 1;
         q.responseMark = 0;
         r.received(q);
@@ -333,6 +336,48 @@
 
         // TODO aconway 2007-01-14: Test for batching when supported.
         
+    }
+
+
+    std::vector<Correlator::ResponsePtr> correlations;
+
+    void correlatorCallback(Correlator::ResponsePtr r) {
+        correlations.push_back(r);
+    }
+
+    struct DummyResponse : public AMQResponseBody {
+        DummyResponse(ResponseId id=0, RequestId req=0, BatchOffset off=0)
+            : AMQResponseBody(version, id, req, off) {}
+        uint32_t size() const { return 0; }
+        void print(std::ostream&) const {}
+        MethodId amqpMethodId() const { return 0; }
+        ClassId  amqpClassId() const { return 0; }
+        void encodeContent(Buffer& ) const {}
+        void decodeContent(Buffer& ) {}
+    };
+
+    void testCorrelator() {
+        CPPUNIT_ASSERT(correlations.empty());
+        Correlator c;
+        Correlator::Action action = boost::bind(&FramingTest::correlatorCallback, this, _1);
+        c.request(5, action);
+        Correlator::ResponsePtr r1(new DummyResponse(3, 5, 0));
+        CPPUNIT_ASSERT(c.response(r1));
+        CPPUNIT_ASSERT_EQUAL(size_t(1), correlations.size());
+        CPPUNIT_ASSERT(correlations.front() == r1);
+        correlations.clear();
+
+        c.request(6, action);
+        c.request(7, action);
+        c.request(8, action);
+        Correlator::ResponsePtr r2(new DummyResponse(4, 6, 3));
+        CPPUNIT_ASSERT(c.response(r2));
+        CPPUNIT_ASSERT_EQUAL(size_t(3), correlations.size());
+        CPPUNIT_ASSERT(r2 == correlations[0]);
+        CPPUNIT_ASSERT(r2 == correlations[1]);
+        CPPUNIT_ASSERT(r2 == correlations[2]);
+        Correlator::ResponsePtr r3(new DummyResponse(5, 99, 0));
+        CPPUNIT_ASSERT(!c.response(r3));
     }
 
     // expect may contain null chars so use string(ptr,size) constructor

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=525932&r1=525931&r2=525932
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Apr  5 12:16:09 2007
@@ -110,6 +110,9 @@
 
 check: .valgrindrc $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker) 
 
+check-unit:
+	$(MAKE) check TESTS=run-unit-tests
+
 # Create a copy so user can modify without risk of checking in their mods.
 .valgrindrc: .valgrindrc-default
 	cp .valgrindrc-default .valgrindrc