You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/30 19:20:01 UTC

svn commit: r501502 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/client/ lib/common/framing/ lib/common/sys/apr/ tests/

Author: aconway
Date: Tue Jan 30 10:20:00 2007
New Revision: 501502

URL: http://svn.apache.org/viewvc?view=rev&rev=501502
Log:
* client/* framing/*: fixed client-side request ID processing.

* cpp/tests/InProcessBroker.h: For tests: connect to an in-process
  broker directly, bypass the network. Keeps log of client/broker
  conversation for verification in test code.

* cpp/tests/FramingTest.cpp (testRequestResponseRoundtrip):
  Client/broker round-trip test for request/reponse IDs and response mark.

* APRAcceptor.cpp (APRAcceptor): fixed valgrind uninitialized error.

Added:
    incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRAcceptor.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Tue Jan 30 10:20:00 2007
@@ -85,13 +85,9 @@
      connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
     **/
 
-    connection->send(
-        new AMQFrame(
-            version, 0,
-            new ConnectionTuneOkBody(
-                version, proposal->getChannelMax(),
-                connection->getMaxFrameSize(),
-                proposal->getHeartbeat())));
+    (new ConnectionTuneOkBody(
+        version, proposal->getChannelMax(), connection->getMaxFrameSize(),
+        proposal->getHeartbeat()))->send(context);
     
     u_int16_t heartbeat = proposal->getHeartbeat();
     connection->connector->setReadTimeout(heartbeat * 2);
@@ -100,9 +96,8 @@
     // Send connection open.
     std::string capabilities;
     responses.expect();
-    send(new AMQFrame(
-             version, 0,
-             new ConnectionOpenBody(version, vhost, capabilities, true)));
+    (new ConnectionOpenBody(version, vhost, capabilities, true))
+        ->send(context);
     //receive connection.open-ok (or redirect, but ignore that for now
     //esp. as using force=true).
     responses.waitForResponse();
@@ -213,7 +208,8 @@
     if (i != consumers.end()) {
         Consumer& c = i->second;
         if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
-            send(new BasicAckBody(version, c.lastDeliveryTag, true));
+            (new BasicAckBody(version, c.lastDeliveryTag, true))
+                ->send(context);
         sendAndReceiveSync<BasicCancelOkBody>(
             synch, new BasicCancelBody(version, tag, !synch));
         consumers.erase(tag);
@@ -231,7 +227,8 @@
             // trying the rest. NB no memory leaks if we do,
             // ConsumerMap holds values, not pointers.
             // 
-            send(new BasicAckBody(version, c.lastDeliveryTag, true));
+            (new BasicAckBody(version, c.lastDeliveryTag, true))
+                ->send(context);
         }
     }
 }
@@ -251,9 +248,8 @@
 
 bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
     string name = queue.getName();
-    AMQBody::shared_ptr  body(new BasicGetBody(version, 0, name, ackMode));
     responses.expect();
-    send(body);
+    (new BasicGetBody(version, 0, name, ackMode))->send(context);
     responses.waitForResponse();
     AMQMethodBody::shared_ptr response = responses.getResponse();
     if(response->isA<BasicGetOkBody>()) {
@@ -276,10 +272,12 @@
 
     
 void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
+    // FIXME aconway 2007-01-30: Rework for message class.
+    
     string e = exchange.getName();
     string key = routingKey;
 
-    send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
+    (new BasicPublishBody(version, 0, e, key, mandatory, immediate))->send(context);
     //break msg up into header frame and content frame(s) and send these
     string data = msg.getData();
     msg.header->setContentSize(data.length());
@@ -428,7 +426,8 @@
             if(++(consumer.count) < prefetch) break;
             //else drop-through
           case AUTO_ACK:
-            send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
+            (new BasicAckBody(version, msg.getDeliveryTag(), multiple))
+                ->send(context);
             consumer.lastDeliveryTag = 0;
         }
     }
@@ -510,20 +509,20 @@
     dispatcher.join();        
 }
 
-void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m)
+void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
 {
     responses.expect();
-    send(toSend);
+    toSend->send(context);
     responses.receive(c, m);
 }
 
 void Channel::sendAndReceiveSync(
-    bool sync, AMQBody* body, ClassId c, MethodId m)
+    bool sync, AMQMethodBody* body, ClassId c, MethodId m)
 {
     if(sync)
         sendAndReceive(body, c, m);
     else
-        send(body);
+        body->send(context);
 }
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Tue Jan 30 10:20:00 2007
@@ -124,21 +124,21 @@
         const std::string& vhost);
     
     void sendAndReceive(
-        framing::AMQBody*, framing::ClassId, framing::MethodId);
+        framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
 
     void sendAndReceiveSync(
         bool sync,
-        framing::AMQBody*, framing::ClassId, framing::MethodId);
+        framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
 
     template <class BodyType>
-    boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) {
+    boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) {
         sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID);
         return boost::shared_polymorphic_downcast<BodyType>(
             responses.getResponse());
     }
 
     template <class BodyType> void sendAndReceiveSync(
-        bool sync, framing::AMQBody* body) {
+        bool sync, framing::AMQMethodBody* body) {
         sendAndReceiveSync(
             sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID);
     }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Tue Jan 30 10:20:00 2007
@@ -27,6 +27,8 @@
 #include <iostream>
 #include <sstream>
 #include <MethodBodyInstances.h>
+#include <boost/bind.hpp>
+#include <functional>
 
 using namespace qpid::framing;
 using namespace qpid::sys;
@@ -41,45 +43,59 @@
 const std::string Connection::OK("OK");
 
 Connection::Connection(
-    bool debug, u_int32_t _max_frame_size,
+    bool _debug, u_int32_t _max_frame_size,
     const framing::ProtocolVersion& _version
-) : max_frame_size(_max_frame_size), closed(true),
-    version(_version)
-{
-    connector = new Connector(version, debug, _max_frame_size);
-}
+) : version(_version), max_frame_size(_max_frame_size),
+    defaultConnector(version, debug, max_frame_size),
+    connector(&defaultConnector),
+    isOpen(false), debug(_debug)
+{}
 
 Connection::~Connection(){
-    delete connector;
+    close();
 }
 
-void Connection::open(
-    const std::string& _host, int _port, const std::string& uid,
-    const std::string& pwd, const std::string& virtualhost)
+void Connection::setConnector(Connector& con)
 {
-
-    host = _host;
-    port = _port;
+    connector = &con;
     connector->setInputHandler(this);
     connector->setTimeoutHandler(this);
     connector->setShutdownHandler(this);
     out = connector->getOutputHandler();
+}
+
+void Connection::open(
+    const std::string& host, int port,
+    const std::string& uid, const std::string& pwd, const std::string& vhost)
+{
+    if (isOpen)
+        THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
     connector->connect(host, port);
-    
-    // Open the special channel 0.
     channels[0] = &channel0;
     channel0.open(0, *this);
-    channel0.protocolInit(uid, pwd, virtualhost);
+    channel0.protocolInit(uid, pwd, vhost);
+    isOpen = true;
 }
 
+void Connection::shutdown() {
+    close();
+}
+        
 void Connection::close(
     ReplyCode code, const string& msg, ClassId classId, MethodId methodId
 )
 {
-    if(!closed) {
+    if(isOpen) {
+        // TODO aconway 2007-01-29: Exception handling - could end up
+        // partly closed.
+        isOpen = false;
         channel0.sendAndReceive<ConnectionCloseOkBody>(
             new ConnectionCloseBody(
                 getVersion(), code, msg, classId, methodId));
+        while(!channels.empty()) {
+            channels.begin()->second->close();
+            channels.erase(channels.begin());
+        }
         connector->close();
     }
 }
@@ -138,16 +154,6 @@
 
 void Connection::idleOut(){
     out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
-}
-
-void Connection::shutdown(){
-    closed = true;
-    //close all channels, also removes them from the map.
-    while(!channels.empty()){
-        Channel* channel = channels.begin()->second;
-        if (channel != 0)
-            channel->close();
-    }
 }
 
 }} // namespace qpid::client

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h Tue Jan 30 10:20:00 2007
@@ -1,5 +1,5 @@
-#ifndef _Connection_
-#define _Connection_
+#ifndef _client_Connection_
+#define _client_Connection_
 
 /*
  *
@@ -89,19 +89,19 @@
     static framing::ChannelId channelIdCounter;
     static const std::string OK;
         
-    std::string host;
-    int port;
+    framing::ProtocolVersion version;
     const u_int32_t max_frame_size;
-    ChannelMap channels; 
+    ChannelMap channels;
+    Connector defaultConnector;
     Connector* connector;
     framing::OutputHandler* out;
-    volatile bool closed;
-    framing::ProtocolVersion version;
+    volatile bool isOpen;
         
     void erase(framing::ChannelId);
     void channelException(
         Channel&, framing::AMQMethodBody*, const QpidError&);
     Channel channel0;
+    bool debug;
 
     // TODO aconway 2007-01-26: too many friendships, untagle these classes.
   friend class Channel;
@@ -145,10 +145,10 @@
      * within a single broker).
      */
     void open(const std::string& host, int port = 5672, 
-              const std::string& uid = "guest", const std::string& pwd = "guest", 
+              const std::string& uid = "guest",
+              const std::string& pwd = "guest", 
               const std::string& virtualhost = "/");
 
-
     /**
      * Close the connection with optional error information for the peer.
      *
@@ -177,7 +177,10 @@
     void idleOut();
     void idleIn();
     void shutdown();
-
+    
+    /**\internal used for testing */
+    void setConnector(Connector& connector);
+    
     /**
      * @return the maximum frame size in use on this connection
      */
@@ -187,8 +190,7 @@
     const framing::ProtocolVersion& getVersion() { return version; }
 };
 
-}
-}
+}} // namespace qpid::client
 
 
 #endif

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h Tue Jan 30 10:20:00 2007
@@ -37,13 +37,13 @@
 
 namespace client {
 
-class Connector : public qpid::framing::OutputHandler, 
-                  private qpid::sys::Runnable
+class Connector : public framing::OutputHandler, 
+                  private sys::Runnable
 {
     const bool debug;
     const int receive_buffer_size;
     const int send_buffer_size;
-    qpid::framing::ProtocolVersion version;
+    framing::ProtocolVersion version;
 
     bool closed;
 
@@ -53,22 +53,22 @@
     u_int32_t idleIn;
     u_int32_t idleOut;
 
-    qpid::sys::TimeoutHandler* timeoutHandler;
-    qpid::sys::ShutdownHandler* shutdownHandler;
-    qpid::framing::InputHandler* input;
-    qpid::framing::InitiationHandler* initialiser;
-    qpid::framing::OutputHandler* output;
+    sys::TimeoutHandler* timeoutHandler;
+    sys::ShutdownHandler* shutdownHandler;
+    framing::InputHandler* input;
+    framing::InitiationHandler* initialiser;
+    framing::OutputHandler* output;
 	
-    qpid::framing::Buffer inbuf;
-    qpid::framing::Buffer outbuf;
+    framing::Buffer inbuf;
+    framing::Buffer outbuf;
 
-    qpid::sys::Mutex writeLock;
-    qpid::sys::Thread receiver;
+    sys::Mutex writeLock;
+    sys::Thread receiver;
 
-    qpid::sys::Socket socket;
+    sys::Socket socket;
 
     void checkIdle(ssize_t status);
-    void writeBlock(qpid::framing::AMQDataBlock* data);
+    void writeBlock(framing::AMQDataBlock* data);
     void writeToSocket(char* data, size_t available);
     void setSocketTimeout();
 
@@ -77,23 +77,22 @@
 
   friend class Channel;
   public:
-    Connector(const qpid::framing::ProtocolVersion& pVersion,
+    Connector(const framing::ProtocolVersion& pVersion,
               bool debug = false, u_int32_t buffer_size = 1024);
     virtual ~Connector();
     virtual void connect(const std::string& host, int port);
     virtual void init();
     virtual void close();
-    virtual void setInputHandler(qpid::framing::InputHandler* handler);
-    virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
-    virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
-    virtual qpid::framing::OutputHandler* getOutputHandler();
-    virtual void send(qpid::framing::AMQFrame* frame);
+    virtual void setInputHandler(framing::InputHandler* handler);
+    virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
+    virtual void setShutdownHandler(sys::ShutdownHandler* handler);
+    virtual framing::OutputHandler* getOutputHandler();
+    virtual void send(framing::AMQFrame* frame);
     virtual void setReadTimeout(u_int16_t timeout);
     virtual void setWriteTimeout(u_int16_t timeout);
 };
 
-}
-}
+}}
 
 
 #endif

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp Tue Jan 30 10:20:00 2007
@@ -29,6 +29,7 @@
     id = i;
     out = &o;
     version = v;
+    context = MethodContext(id, this);
 }
 
 void ChannelAdapter::send(AMQFrame* frame) {
@@ -58,19 +59,21 @@
 void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
     assertMethodOk(*request);
     responder.received(request->getData());
-    MethodContext context(id, this, request->getRequestId());
+    context =MethodContext(id, this, request->getRequestId());
     handleMethodInContext(request, context);
 }
 
 void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
     assertMethodOk(*response);
-    handleMethod(response);
+    // TODO aconway 2007-01-30: Consider a response handled on receipt.
+    // Review - any cases where this is not the case?
     requester.processed(response->getData());
+    handleMethod(response);
 }
 
 void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
     assertMethodOk(*method);
-    MethodContext context(id, this);
+    context = MethodContext(id, this);
     handleMethodInContext(method, context);
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h Tue Jan 30 10:20:00 2007
@@ -54,7 +54,7 @@
     /**
      *@param output Processed frames are forwarded to this handler.
      */
-    ChannelAdapter() : id(0), out(0) {}
+    ChannelAdapter() : context(0), id(0), out(0) {}
 
     /** Initialize the channel adapter. */
     void init(ChannelId, OutputHandler&, const ProtocolVersion&);
@@ -92,6 +92,9 @@
 
     RequestId getRequestInProgress() { return requestInProgress; }
 
+  protected:
+    MethodContext context;
+    
   private:
     ChannelId id;
     OutputHandler* out;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h Tue Jan 30 10:20:00 2007
@@ -50,7 +50,7 @@
         : channelId(channel), out(output), requestId(request){}
 
     /** \internal Channel on which the method is sent. */
-    const ChannelId channelId;
+    ChannelId channelId;
 
     /** Output handler for responses in this context */
     OutputHandler* out;
@@ -58,7 +58,7 @@
     /** \internal If we are in the context of processing an incoming request,
      * this is the ID. Otherwise it is 0.
      */ 
-    const RequestId requestId;
+    RequestId requestId;
 
 };
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRAcceptor.cpp?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRAcceptor.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRAcceptor.cpp Tue Jan 30 10:20:00 2007
@@ -56,10 +56,11 @@
 // Must define Acceptor virtual dtor.
 Acceptor::~Acceptor() {}
 
-    APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
+APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
     port(port_),
     trace(trace_),
-    processor(APRPool::get(), threads, 1000, 5000000)
+    processor(APRPool::get(), threads, 1000, 5000000),
+    running(false)
 {
     apr_sockaddr_t* address;
     CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp?view=diff&rev=501502&r1=501501&r2=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp Tue Jan 30 10:20:00 2007
@@ -18,6 +18,9 @@
  * under the License.
  *
  */
+#include <memory>
+#include <boost/lexical_cast.hpp>
+
 #include <ConnectionRedirectBody.h>
 #include <ProtocolVersion.h>
 #include <amqp_framing.h>
@@ -25,15 +28,20 @@
 #include <qpid_test_plugin.h>
 #include <sstream>
 #include <typeinfo>
+#include <QpidError.h>
 #include <AMQP_HighestVersion.h>
 #include "AMQRequestBody.h"
 #include "AMQResponseBody.h"
 #include "Requester.h"
 #include "Responder.h"
-#include <QpidError.h>
+#include "InProcessBroker.h"
+#include "client/Connection.h"
+#include "client/ClientExchange.h"
+#include "client/ClientQueue.h"
 
+using namespace qpid;
 using namespace qpid::framing;
-using qpid::QpidError;
+using namespace std;
 
 template <class T>
 std::string tostring(const T& x) 
@@ -60,6 +68,7 @@
     CPPUNIT_TEST(testInlineContent);
     CPPUNIT_TEST(testContentReference);
     CPPUNIT_TEST(testContentValidation);
+    CPPUNIT_TEST(testRequestResponseRoundtrip);
     CPPUNIT_TEST_SUITE_END();
 
   private:
@@ -324,7 +333,43 @@
         // TODO aconway 2007-01-14: Test for batching when supported.
         
     }
-};
+
+    // expect may contain null chars so use string(ptr,size) constructor
+    // Use sizeof(expect)-1 to strip the trailing null.
+#define ASSERT_FRAME(expect, frame) \
+    CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame))
+
+    void testRequestResponseRoundtrip() {
+        broker::InProcessBroker ibroker(version);
+        client::Connection clientConnection;
+        clientConnection.setConnector(ibroker);
+        clientConnection.open("");
+        client::Channel c;
+        clientConnection.openChannel(c);
+
+        client::Exchange exchange(
+            "MyExchange", client::Exchange::TOPIC_EXCHANGE);
+        client::Queue queue("MyQueue", true);
+        c.declareExchange(exchange);
+        c.declareQueue(queue);
+        c.bind(exchange, queue, "MyTopic", framing::FieldTable());
+        broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin();
+        ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
+        ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++);
+        ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++);
+        ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++);
+        ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++);
+        ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++);
+        ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+        ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++);
+        ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++);
+    }
+ };
 
 
 // Make this test suite a plugin.

Added: incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h?view=auto&rev=501502
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h Tue Jan 30 10:20:00 2007
@@ -0,0 +1,153 @@
+#ifndef _tests_InProcessBroker_h
+#define _tests_InProcessBroker_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <vector>
+#include <iostream>
+#include <algorithm>
+
+#include "framing/AMQFrame.h"
+#include "broker/Broker.h"
+#include "broker/Connection.h"
+#include "client/Connector.h"
+
+namespace qpid {
+namespace broker {
+
+/** Make a copy of a frame body. Inefficient, only intended for tests. */
+// TODO aconway 2007-01-29: from should be const, need to fix
+// AMQPFrame::encode as const.
+framing::AMQFrame copy(framing::AMQFrame& from) {
+    framing::Buffer buffer(from.size());
+    from.encode(buffer);
+    buffer.flip();
+    framing::AMQFrame result;
+    result.decode(buffer);
+    return result;
+}
+
+/**
+ * A broker that implements client::Connector allowing direct
+ * in-process connection of client to broker. Used to write round-trip
+ * tests without requiring an external broker process.
+ *
+ * Also allows you to "snoop" on frames exchanged between client & broker.
+ * 
+ * Use as follows:
+ *
+ \code
+        broker::InProcessBroker ibroker(version);
+        client::Connection clientConnection;
+        clientConnection.setConnector(ibroker);
+        clientConnection.open("");
+        ... use as normal
+ \endcode
+ *
+ */
+class InProcessBroker : public client::Connector {
+  public:
+    enum Sender {CLIENT,BROKER};
+    struct Frame : public framing::AMQFrame {
+        Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
+        bool fromBroker() const { return from == BROKER; }
+        bool fromClient() const { return from == CLIENT; }
+
+        template <class MethodType>
+        MethodType* asMethod() {
+            return dynamic_cast<MethodType*>(getBody().get());
+        }
+
+        Sender from;
+    };
+    typedef std::vector<Frame> Conversation;
+
+    InProcessBroker(const framing::ProtocolVersion& ver) :
+        Connector(ver),
+        protocolInit(ver),
+        broker(broker::Broker::create()),
+        brokerOut(BROKER, conversation),
+        brokerConnection(&brokerOut, *broker),
+        clientOut(CLIENT, conversation, &brokerConnection)
+    {}
+
+    void connect(const std::string& /*host*/, int /*port*/) {}
+    void init() { brokerConnection.initiated(&protocolInit); }
+    void close() {}
+
+    /** Client's input handler. */
+    void setInputHandler(framing::InputHandler* handler) {
+        brokerOut.in = handler;
+    }
+
+    /** Called by client to send a frame */
+    void send(framing::AMQFrame* frame) {
+        clientOut.send(frame);
+    }
+
+    /** Entire client-broker conversation is recorded here */
+    Conversation conversation;
+
+  private:
+    /** OutputHandler that forwards data to an InputHandler */
+    struct OutputToInputHandler : public sys::ConnectionOutputHandler {
+        OutputToInputHandler(
+            Sender from_, Conversation& conversation_,
+            framing::InputHandler* ih=0
+        ) : from(from_), conversation(conversation_), in(ih) {}
+
+        void send(framing::AMQFrame* frame) {
+            conversation.push_back(Frame(from, copy(*frame)));
+            in->received(frame);
+        }
+
+        void close() {}
+        
+        Sender from;
+        Conversation& conversation;
+        framing::InputHandler* in;
+    };
+
+    framing::ProtocolInitiation protocolInit;
+    Broker::shared_ptr  broker;
+    OutputToInputHandler brokerOut;
+    broker::Connection brokerConnection;
+    OutputToInputHandler clientOut;
+};
+
+std::ostream& operator<<(
+    std::ostream& out, const InProcessBroker::Frame& frame)
+{
+    return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") <<
+        static_cast<const framing::AMQFrame&>(frame);
+}
+std::ostream& operator<<(
+    std::ostream& out, const InProcessBroker::Conversation& conv)
+{
+    for (InProcessBroker::Conversation::const_iterator i = conv.begin();
+         i != conv.end(); ++i)
+    {
+        out << *i << std::endl;
+    }
+    return out;
+}
+
+
+}} // namespace qpid::broker
+
+#endif  /*!_tests_InProcessBroker_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/InProcessBroker.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date