You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/02 23:03:12 UTC

svn commit: r502767 [1/2] - in /incubator/qpid/branches/qpid.0-9: cpp/lib/broker/ cpp/lib/client/ cpp/lib/common/framing/ cpp/tests/ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/

Author: aconway
Date: Fri Feb  2 14:03:10 2007
New Revision: 502767

URL: http://svn.apache.org/viewvc?view=rev&rev=502767
Log:

* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
  ChannelAdapter and Method Body. Request ID comes from body,
  ChannelAdapter is used to send frames, not OutputHandler.

* cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member.
  Context is per-method not per-channel.

* cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId
  with MethodContext (for responses) or ChannelAdapter (for requests.)
  Use context request-ID to construct responses, send all bodies via
  ChannelAdapter.

* cpp/lib/broker/BrokerAdapter.cpp:  Link broker::Channel to BrokerAdapter.

* cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters.
  Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion&

* Cosmetic changes, many files:
 - fixed indentation, broke long lines.
 - removed unnecessary qpid:: prefixes.

* broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into
  broker::channel.

Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/NullMessageStore.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersion.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/ExchangeTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp
    incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
    incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java
    incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl
    incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp Fri Feb  2 14:03:10 2007
@@ -54,7 +54,7 @@
     factory(*this)
 {
     if (config.getStore().empty())
-        store.reset(new NullMessageStore());
+        store.reset(new NullMessageStore(config.isTrace()));
     else
         store.reset(new MessageStoreModule(config.getStore()));
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Fri Feb  2 14:03:10 2007
@@ -28,168 +28,20 @@
 using namespace qpid;
 using namespace qpid::framing;
 
-typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+typedef std::vector<Queue::shared_ptr> QueueVector;
 
-class BrokerAdapter::ServerOps : public AMQP_ServerOperations
-{
-  public:
-    ServerOps(Channel& ch, Connection& c, Broker& b) :
-        basicHandler(ch, c, b),
-        channelHandler(ch, c, b),
-        connectionHandler(ch, c, b),
-        exchangeHandler(ch, c, b),
-        messageHandler(ch, c, b),
-        queueHandler(ch, c, b),
-        txHandler(ch, c, b)    
-    {}
-    
-    ChannelHandler* getChannelHandler() { return &channelHandler; }
-    ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
-    BasicHandler* getBasicHandler() { return &basicHandler; }
-    ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
-    QueueHandler* getQueueHandler() { return &queueHandler; }
-    TxHandler* getTxHandler() { return &txHandler;  }
-    MessageHandler* getMessageHandler() { return &messageHandler;  }
-    AccessHandler* getAccessHandler() {
-        throw ConnectionException(540, "Access class not implemented");  }
-    FileHandler* getFileHandler() {
-        throw ConnectionException(540, "File class not implemented");  }
-    StreamHandler* getStreamHandler() {
-        throw ConnectionException(540, "Stream class not implemented");  }
-    DtxHandler* getDtxHandler() {
-        throw ConnectionException(540, "Dtx class not implemented");  }
-    TunnelHandler* getTunnelHandler() {
-        throw ConnectionException(540, "Tunnel class not implemented"); }
-
-  private:
-    struct CoreRefs {
-        CoreRefs(Channel& ch, Connection& c, Broker& b)
-            : channel(ch), connection(c), broker(b) {}
-
-        Channel& channel;
-        Connection& connection;
-        Broker& broker;
-    };
-    
-    class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
-      public:
-        ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-
-        void startOk(const MethodContext& context,
-                     const qpid::framing::FieldTable& clientProperties,
-                     const std::string& mechanism, const std::string& response,
-                     const std::string& locale); 
-        void secureOk(const MethodContext& context, const std::string& response); 
-        void tuneOk(const MethodContext& context, u_int16_t channelMax,
-                    u_int32_t frameMax, u_int16_t heartbeat); 
-        void open(const MethodContext& context, const std::string& virtualHost,
-                  const std::string& capabilities, bool insist); 
-        void close(const MethodContext& context, u_int16_t replyCode,
-                   const std::string& replyText,
-                   u_int16_t classId, u_int16_t methodId); 
-        void closeOk(const MethodContext& context); 
-    };
-
-    class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
-      public:
-        ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-        void open(const MethodContext& context, const std::string& outOfBand); 
-        void flow(const MethodContext& context, bool active); 
-        void flowOk(const MethodContext& context, bool active); 
-        void ok( const MethodContext& context );
-        void ping( const MethodContext& context );
-        void pong( const MethodContext& context );
-        void resume( const MethodContext& context, const std::string& channelId );
-        void close(const MethodContext& context, u_int16_t replyCode, const
-                   std::string& replyText, u_int16_t classId, u_int16_t methodId); 
-        void closeOk(const MethodContext& context); 
-    };
-    
-    class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
-      public:
-        ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-        void declare(const MethodContext& context, u_int16_t ticket,
-                     const std::string& exchange, const std::string& type, 
-                     bool passive, bool durable, bool autoDelete,
-                     bool internal, bool nowait, 
-                     const qpid::framing::FieldTable& arguments); 
-        void delete_(const MethodContext& context, u_int16_t ticket,
-                     const std::string& exchange, bool ifUnused, bool nowait); 
-    };
-
-    class QueueHandlerImpl : private CoreRefs, public QueueHandler{
-      public:
-        QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-        void declare(const MethodContext& context, u_int16_t ticket, const std::string& queue, 
-                     bool passive, bool durable, bool exclusive, 
-                     bool autoDelete, bool nowait,
-                     const qpid::framing::FieldTable& arguments); 
-        void bind(const MethodContext& context, u_int16_t ticket, const std::string& queue, 
-                  const std::string& exchange, const std::string& routingKey,
-                  bool nowait, const qpid::framing::FieldTable& arguments); 
-        void unbind(const MethodContext& context,
-                    u_int16_t ticket,
-                    const std::string& queue,
-                    const std::string& exchange,
-                    const std::string& routingKey,
-                    const qpid::framing::FieldTable& arguments );
-        void purge(const MethodContext& context, u_int16_t ticket, const std::string& queue, 
-                   bool nowait); 
-        void delete_(const MethodContext& context, u_int16_t ticket, const std::string& queue,
-                     bool ifUnused, bool ifEmpty, 
-                     bool nowait);
-    };
-
-    class BasicHandlerImpl : private CoreRefs, public BasicHandler{
-      public:
-        BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-        void qos(const MethodContext& context, u_int32_t prefetchSize,
-                 u_int16_t prefetchCount, bool global); 
-        void consume(
-            const MethodContext& context, u_int16_t ticket, const std::string& queue,
-            const std::string& consumerTag, bool noLocal, bool noAck,
-            bool exclusive, bool nowait,
-            const qpid::framing::FieldTable& fields); 
-        void cancel(const MethodContext& context, const std::string& consumerTag,
-                    bool nowait); 
-        void publish(const MethodContext& context, u_int16_t ticket,
-                     const std::string& exchange, const std::string& routingKey, 
-                     bool mandatory, bool immediate); 
-        void get(const MethodContext& context, u_int16_t ticket, const std::string& queue,
-                 bool noAck); 
-        void ack(const MethodContext& context, u_int64_t deliveryTag, bool multiple); 
-        void reject(const MethodContext& context, u_int64_t deliveryTag, bool requeue); 
-        void recover(const MethodContext& context, bool requeue); 
-    };
-
-    class TxHandlerImpl : private CoreRefs, public TxHandler{
-      public:
-        TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-        void select(const MethodContext& context);
-        void commit(const MethodContext& context);
-        void rollback(const MethodContext& context);
-    };
-
-    BasicHandlerImpl basicHandler;
-    ChannelHandlerImpl channelHandler;
-    ConnectionHandlerImpl connectionHandler;
-    ExchangeHandlerImpl exchangeHandler;
-    MessageHandlerImpl messageHandler;
-    QueueHandlerImpl queueHandler;
-    TxHandlerImpl txHandler;
-
-};
-
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
-    const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, 
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk(
+    const MethodContext& context , const FieldTable& /*clientProperties*/,
+    const string& /*mechanism*/, 
     const string& /*response*/, const string& /*locale*/){
     connection.client->getConnection().tune(
         context, 100, connection.getFrameMax(), connection.getHeartbeat());
 }
         
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk(
+    const MethodContext&, const string& /*response*/){}
         
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
     const MethodContext&, u_int16_t /*channelmax*/,
     u_int32_t framemax, u_int16_t heartbeat)
 {
@@ -197,12 +49,12 @@
     connection.setHeartbeat(heartbeat);
 }
         
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
     string knownhosts;
     connection.client->getConnection().openOk(context, knownhosts);
 }
         
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close(
     const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, 
     u_int16_t /*classId*/, u_int16_t /*methodId*/)
 {
@@ -210,21 +62,21 @@
     connection.getOutput().close();
 } 
         
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
     connection.getOutput().close();
 } 
               
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open(
     const MethodContext& context, const string& /*outOfBand*/){
     channel.open();
     // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
     connection.client->getChannel().openOk(context, std::string()/* ID */);
 } 
         
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}         
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} 
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}         
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} 
         
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close(
     const MethodContext& context, u_int16_t /*replyCode*/,
     const string& /*replyText*/,
     u_int16_t /*classId*/, u_int16_t /*methodId*/)
@@ -234,13 +86,13 @@
     connection.closeChannel(channel.getId()); 
 } 
         
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&){} 
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} 
               
 
 
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, 
-                                                            bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
-                                                            const FieldTable& /*arguments*/){
+void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, 
+                                                                bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
+                                                                const FieldTable& /*arguments*/){
 
     if(passive){
         if(!broker.getExchanges().get(exchange)) {
@@ -265,17 +117,17 @@
     }
 }
                 
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, 
-                                                            const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, 
+                                                                const string& exchange, bool /*ifUnused*/, bool nowait){
 
     //TODO: implement unused
     broker.getExchanges().destroy(exchange);
     if(!nowait) connection.client->getExchange().deleteOk(context);
 } 
 
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, 
-                                                         bool passive, bool durable, bool exclusive, 
-                                                         bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, 
+                                                             bool passive, bool durable, bool exclusive, 
+                                                             bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
 	queue = connection.getQueue(name, channel.getId());
@@ -308,9 +160,9 @@
     }
 } 
         
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, 
-                                                      const string& exchangeName, const string& routingKey, bool nowait, 
-                                                      const FieldTable& arguments){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, 
+                                                          const string& exchangeName, const string& routingKey, bool nowait, 
+                                                          const FieldTable& arguments){
 
     Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
     Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
@@ -325,7 +177,7 @@
 }
  
 void 
-BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
+BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
     const MethodContext& context,
     u_int16_t /*ticket*/,
     const string& queueName,
@@ -344,15 +196,15 @@
     connection.client->getQueue().unbindOk(context);    
 }
         
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
 
     Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
     int count = queue->purge();
     if(!nowait) connection.client->getQueue().purgeOk(context, count);
 } 
         
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, 
-                                                         bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, 
+                                                             bool ifUnused, bool ifEmpty, bool nowait){
     ChannelException error(0, "");
     int count(0);
     Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
@@ -363,7 +215,7 @@
     }else{
         //remove the queue from the list of exclusive queues if necessary
         if(q->isExclusiveOwner(&connection)){
-            queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
+            QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
             if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
         }
         count = q->getMessageCount();
@@ -377,14 +229,14 @@
         
 
 
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
     //TODO: handle global
     channel.setPrefetchSize(prefetchSize);
     channel.setPrefetchCount(prefetchCount);
     connection.client->getBasic().qosOk(context);
 } 
         
-void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
     const MethodContext& context, u_int16_t /*ticket*/, 
     const string& queueName, const string& consumerTag, 
     bool noLocal, bool noAck, bool exclusive, 
@@ -412,19 +264,23 @@
 
 } 
         
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
     channel.cancel(consumerTag);
 
     if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
 } 
         
-void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/, 
-                                                         const string& exchangeName, const string& routingKey, 
-                                                         bool mandatory, bool immediate){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
+    const MethodContext& context, u_int16_t /*ticket*/, 
+    const string& exchangeName, const string& routingKey, 
+    bool mandatory, bool immediate)
+{
 
     Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
     if(exchange){
-        BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
+        BasicMessage* msg = new BasicMessage(
+            &connection, exchangeName, routingKey, mandatory, immediate,
+            context.methodBody);
         channel.handlePublish(msg, exchange);
     }else{
         throw ChannelException(
@@ -432,7 +288,7 @@
     }
 } 
         
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
     Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());    
     if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
         string clusterId;//not used, part of an imatix hack
@@ -441,7 +297,7 @@
     }
 } 
         
-void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
     try{
         channel.ack(deliveryTag, multiple);
     }catch(InvalidAckException& e){
@@ -449,23 +305,23 @@
     }
 } 
         
-void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} 
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} 
         
-void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
     channel.recover(requeue);
 } 
 
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
     channel.begin();
     connection.client->getTx().selectOk(context);
 }
 
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
     channel.commit();
     connection.client->getTx().commitOk(context);
 }
 
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
     
     channel.rollback();
     connection.client->getTx().rollbackOk(context);
@@ -473,82 +329,32 @@
 }
               
 void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& )
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
 {
     //no specific action required, generic response handling should be sufficient
 }
 
 void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context)
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
 {
     connection.client->getChannel().ok(context);
     connection.client->getChannel().pong(context);
 }
 
 void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
 {
     connection.client->getChannel().ok(context);
 }
 
 void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
     const MethodContext&,
     const string& /*channel*/ )
 {
     assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
-BrokerAdapter::BrokerAdapter(
-    std::auto_ptr<Channel> ch, Connection& c, Broker& b
-) :
-    channel(ch),
-    connection(c),
-    broker(b),
-    serverOps(new ServerOps(*channel,c,b))
-{
-    init(channel->getId(), c.getOutput(), channel->getVersion());
-}
-
-void BrokerAdapter::handleMethodInContext(
-    boost::shared_ptr<qpid::framing::AMQMethodBody> method,
-    const MethodContext& context
-)
-{
-    try{
-        method->invoke(*serverOps, context);
-    }catch(ChannelException& e){
-        connection.client->getChannel().close(
-            context, e.code, e.toString(),
-            method->amqpClassId(), method->amqpMethodId());
-        connection.closeChannel(getId());
-    }catch(ConnectionException& e){
-        connection.client->getConnection().close(
-            context, e.code, e.toString(),
-            method->amqpClassId(), method->amqpMethodId());
-    }catch(std::exception& e){
-        connection.client->getConnection().close(
-            context, 541/*internal error*/, e.what(),
-            method->amqpClassId(), method->amqpMethodId());
-    }
-}
-
-void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
-    channel->handleHeader(body);
-}
-
-void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) {
-    channel->handleContent(body);
-}
-
-void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
-    // TODO aconway 2007-01-17: Implement heartbeats.
-}
-
-
-bool BrokerAdapter::isOpen() const {
-    return channel->isOpen();
-}
 
 }} // namespace qpid::broker
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Fri Feb  2 14:03:10 2007
@@ -18,18 +18,14 @@
  * limitations under the License.
  *
  */
-#include <memory.h>
-
 #include "AMQP_ServerOperations.h"
-#include "BodyHandler.h"
+#include "MessageHandlerImpl.h"
 #include "BrokerChannel.h"
-#include "amqp_types.h"
-#include "framing/ChannelAdapter.h"
 
 namespace qpid {
 namespace broker {
 
-class AMQMethodBody;
+class Channel;
 class Connection;
 class Broker;
 
@@ -38,35 +34,173 @@
  *
  * Translates protocol bodies into calls on the core Channel,
  * Connection and Broker objects.
- *
- * Owns a channel, has references to Connection and Broker.
  */
-class BrokerAdapter : public framing::ChannelAdapter
-{
-  public:
-    BrokerAdapter(std::auto_ptr<Channel> ch, Connection&, Broker&);
-    Channel& getChannel() { return *channel; }
 
-    void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
-    void handleContent(boost::shared_ptr<framing::AMQContentBody>);
-    void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+class ChannelHandler;
+class ConnectionHandler;
+class BasicHandler;
+class ExchangeHandler;
+class QueueHandler;
+class TxHandler;
+class MessageHandler;
+class AccessHandler;
+class FileHandler;
+class StreamHandler;
+class DtxHandler;
+class TunnelHandler;
 
-    bool isOpen() const;
+class BrokerAdapter : public framing::AMQP_ServerOperations
+{
+  public:
+    BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+        basicHandler(ch, c, b),
+        channelHandler(ch, c, b),
+        connectionHandler(ch, c, b),
+        exchangeHandler(ch, c, b),
+        messageHandler(ch, c, b),
+        queueHandler(ch, c, b),
+        txHandler(ch, c, b)    
+    {}
     
+    ChannelHandler* getChannelHandler() { return &channelHandler; }
+    ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
+    BasicHandler* getBasicHandler() { return &basicHandler; }
+    ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+    QueueHandler* getQueueHandler() { return &queueHandler; }
+    TxHandler* getTxHandler() { return &txHandler;  }
+    MessageHandler* getMessageHandler() { return &messageHandler;  }
+    AccessHandler* getAccessHandler() {
+        throw ConnectionException(540, "Access class not implemented");  }
+    FileHandler* getFileHandler() {
+        throw ConnectionException(540, "File class not implemented");  }
+    StreamHandler* getStreamHandler() {
+        throw ConnectionException(540, "Stream class not implemented");  }
+    DtxHandler* getDtxHandler() {
+        throw ConnectionException(540, "Dtx class not implemented");  }
+    TunnelHandler* getTunnelHandler() {
+        throw ConnectionException(540, "Tunnel class not implemented"); }
+
   private:
-    void handleMethodInContext(
-        boost::shared_ptr<framing::AMQMethodBody> method,
-        const framing::MethodContext& context);
+    struct CoreRefs {
+        CoreRefs(Channel& ch, Connection& c, Broker& b)
+            : channel(ch), connection(c), broker(b) {}
+
+        Channel& channel;
+        Connection& connection;
+        Broker& broker;
+    };
     
-    class ServerOps;
+    class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
+      public:
+        ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+
+        void startOk(const framing::MethodContext& context,
+                     const qpid::framing::FieldTable& clientProperties,
+                     const std::string& mechanism, const std::string& response,
+                     const std::string& locale); 
+        void secureOk(const framing::MethodContext& context,
+                      const std::string& response); 
+        void tuneOk(const framing::MethodContext& context,
+                    u_int16_t channelMax,
+                    u_int32_t frameMax, u_int16_t heartbeat); 
+        void open(const framing::MethodContext& context,
+                  const std::string& virtualHost,
+                  const std::string& capabilities, bool insist); 
+        void close(const framing::MethodContext& context, u_int16_t replyCode,
+                   const std::string& replyText,
+                   u_int16_t classId, u_int16_t methodId); 
+        void closeOk(const framing::MethodContext& context); 
+    };
+
+    class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
+      public:
+        ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void open(const framing::MethodContext& context, const std::string& outOfBand); 
+        void flow(const framing::MethodContext& context, bool active); 
+        void flowOk(const framing::MethodContext& context, bool active); 
+        void ok( const framing::MethodContext& context );
+        void ping( const framing::MethodContext& context );
+        void pong( const framing::MethodContext& context );
+        void resume( const framing::MethodContext& context, const std::string& channelId );
+        void close(const framing::MethodContext& context, u_int16_t replyCode, const
+                   std::string& replyText, u_int16_t classId, u_int16_t methodId); 
+        void closeOk(const framing::MethodContext& context); 
+    };
+    
+    class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
+      public:
+        ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void declare(const framing::MethodContext& context, u_int16_t ticket,
+                     const std::string& exchange, const std::string& type, 
+                     bool passive, bool durable, bool autoDelete,
+                     bool internal, bool nowait, 
+                     const qpid::framing::FieldTable& arguments); 
+        void delete_(const framing::MethodContext& context, u_int16_t ticket,
+                     const std::string& exchange, bool ifUnused, bool nowait); 
+    };
+
+    class QueueHandlerImpl : private CoreRefs, public QueueHandler{
+      public:
+        QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, 
+                     bool passive, bool durable, bool exclusive, 
+                     bool autoDelete, bool nowait,
+                     const qpid::framing::FieldTable& arguments); 
+        void bind(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, 
+                  const std::string& exchange, const std::string& routingKey,
+                  bool nowait, const qpid::framing::FieldTable& arguments); 
+        void unbind(const framing::MethodContext& context,
+                    u_int16_t ticket,
+                    const std::string& queue,
+                    const std::string& exchange,
+                    const std::string& routingKey,
+                    const qpid::framing::FieldTable& arguments );
+        void purge(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, 
+                   bool nowait); 
+        void delete_(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+                     bool ifUnused, bool ifEmpty, 
+                     bool nowait);
+    };
+
+    class BasicHandlerImpl : private CoreRefs, public BasicHandler{
+      public:
+        BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void qos(const framing::MethodContext& context, u_int32_t prefetchSize,
+                 u_int16_t prefetchCount, bool global); 
+        void consume(
+            const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+            const std::string& consumerTag, bool noLocal, bool noAck,
+            bool exclusive, bool nowait,
+            const qpid::framing::FieldTable& fields); 
+        void cancel(const framing::MethodContext& context, const std::string& consumerTag,
+                    bool nowait); 
+        void publish(const framing::MethodContext& context, u_int16_t ticket,
+                     const std::string& exchange, const std::string& routingKey, 
+                     bool mandatory, bool immediate); 
+        void get(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+                 bool noAck); 
+        void ack(const framing::MethodContext& context, u_int64_t deliveryTag, bool multiple); 
+        void reject(const framing::MethodContext& context, u_int64_t deliveryTag, bool requeue); 
+        void recover(const framing::MethodContext& context, bool requeue); 
+    };
+
+    class TxHandlerImpl : private CoreRefs, public TxHandler{
+      public:
+        TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void select(const framing::MethodContext& context);
+        void commit(const framing::MethodContext& context);
+        void rollback(const framing::MethodContext& context);
+    };
+
+    BasicHandlerImpl basicHandler;
+    ChannelHandlerImpl channelHandler;
+    ConnectionHandlerImpl connectionHandler;
+    ExchangeHandlerImpl exchangeHandler;
+    MessageHandlerImpl messageHandler;
+    QueueHandlerImpl queueHandler;
+    TxHandlerImpl txHandler;
 
-    std::auto_ptr<Channel> channel;
-    Connection& connection;
-    Broker& broker;
-    boost::shared_ptr<ServerOps> serverOps;
 };
-  
-
 }} // namespace qpid::broker
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Fri Feb  2 14:03:10 2007
@@ -18,12 +18,25 @@
  * under the License.
  *
  */
+#include <assert.h>
+
 #include <iostream>
 #include <sstream>
-#include <assert.h>
+#include <algorithm>
+#include <functional>
 
-#include <BrokerChannel.h>
+#include "BrokerChannel.h"
+#include "DeletingTxOp.h"
+#include "framing/ChannelAdapter.h"
 #include <QpidError.h>
+#include <DeliverableMessage.h>
+#include <BrokerQueue.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <TxAck.h>
+#include <TxPublish.h>
+#include "BrokerAdapter.h"
+#include "Connection.h"
 
 using std::mem_fun_ref;
 using std::bind2nd;
@@ -33,12 +46,12 @@
 
 
 Channel::Channel(
-    const ProtocolVersion& _version, OutputHandler* _out, int _id,
+    Connection& con, ChannelId id,
     u_int32_t _framesize, MessageStore* const _store,
     u_int64_t _stagingThreshold
 ) :
-    id(_id),
-    out(*_out),
+    ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()),
+    connection(con),
     currentDeliveryTag(1),
     transactional(false),
     prefetchSize(0),
@@ -47,8 +60,8 @@
     tagGenerator("sgen"),
     store(_store),
     messageBuilder(this, _store, _stagingThreshold),
-    version(_version),
-    opened(false)
+    opened(true),
+    adapter(new BrokerAdapter(*this, con, con.broker))
 {
     outstanding.reset();
 }
@@ -61,7 +74,10 @@
     return consumers.find(consumerTag) != consumers.end();
 }
 
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) {
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,
+                      bool exclusive, ConnectionToken* const connection,
+                      const FieldTable*)
+{
 	if(tag.empty()) tag = tagGenerator.generate();
     ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
     try{
@@ -117,7 +133,10 @@
     accumulatedAck.clear();
 }
 
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+void Channel::deliver(
+    Message::shared_ptr& msg, const string& consumerTag,
+    Queue::shared_ptr& queue, bool ackExpected)
+{
     Mutex::ScopedLock locker(deliveryLock);
 
     u_int64_t deliveryTag = currentDeliveryTag++;
@@ -127,7 +146,7 @@
         outstanding.count++;
     }
     //send deliver method, header and content(s)
-    msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
+    msg->deliver(*this, consumerTag, deliveryTag, framesize);
 }
 
 bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -184,7 +203,7 @@
     messageBuilder.addContent(content);
 }
 
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
     // TODO aconway 2007-01-17: Implement heartbeating.
 }
 
@@ -255,7 +274,9 @@
     if(msg){
         Mutex::ScopedLock locker(deliveryLock);
         u_int64_t myDeliveryTag = currentDeliveryTag++;
-        msg->sendGetOk(&out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
+        msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
+                       queue->getMessageCount() + 1, myDeliveryTag,
+                       framesize);
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }
@@ -265,7 +286,32 @@
     }
 }
 
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
-    msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
+                      u_int64_t deliveryTag)
+{
+    msg->deliver(*this, consumerTag, deliveryTag, framesize);
+}
+
+void Channel::handleMethodInContext(
+    boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+    const MethodContext& context
+)
+{
+    try{
+        method->invoke(*adapter, context);
+    }catch(ChannelException& e){
+        connection.client->getChannel().close(
+            context, e.code, e.toString(),
+            method->amqpClassId(), method->amqpMethodId());
+        connection.closeChannel(getId());
+    }catch(ConnectionException& e){
+        connection.client->getConnection().close(
+            context, e.code, e.toString(),
+            method->amqpClassId(), method->amqpMethodId());
+    }catch(std::exception& e){
+        connection.client->getConnection().close(
+            context, 541/*internal error*/, e.what(),
+            method->amqpClassId(), method->amqpMethodId());
+    }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Fri Feb  2 14:03:10 2007
@@ -22,42 +22,39 @@
  *
  */
 
-#include <algorithm>
-#include <functional>
 #include <list>
 #include <map>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
 #include <AccumulatedAck.h>
-#include <Binding.h>
 #include <Consumer.h>
-#include <DeletingTxOp.h>
-#include <DeliverableMessage.h>
 #include <DeliveryRecord.h>
-#include <BrokerMessage.h>
 #include <MessageBuilder.h>
 #include <NameGenerator.h>
 #include <Prefetch.h>
-#include <BrokerQueue.h>
-#include <MessageStore.h>
-#include <TxAck.h>
 #include <TxBuffer.h>
-#include <TxPublish.h>
-#include <sys/Monitor.h>
-#include <OutputHandler.h>
-#include <AMQContentBody.h>
-#include <AMQHeaderBody.h>
-#include <AMQHeartbeatBody.h>
-#include <BasicPublishBody.h>
+#include "framing/ChannelAdapter.h"
 
 namespace qpid {
 namespace broker {
 
-using qpid::framing::string;
+class ConnectionToken;
+class Connection;
+class Queue;
+class BrokerAdapter;
+
+using framing::string;
 
 /**
  * Maintains state for an AMQP channel. Handles incoming and
  * outgoing messages for that channel.
  */
-class Channel : private MessageBuilder::CompletionHandler {
+class Channel :
+        public framing::ChannelAdapter,
+        private MessageBuilder::CompletionHandler
+{
     class ConsumerImpl : public virtual Consumer
     {
         Channel* parent;
@@ -67,15 +64,18 @@
         const bool ackExpected;
         bool blocked;
       public:
-        ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
+        ConsumerImpl(Channel* parent, const string& tag,
+                     Queue::shared_ptr queue,
+                     ConnectionToken* const connection, bool ack);
         virtual bool deliver(Message::shared_ptr& msg);            
         void cancel();
         void requestDispatch();
     };
 
     typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+
+    Connection& connection;
     u_int16_t id;
-    qpid::framing::OutputHandler& out;
     u_int64_t currentDeliveryTag;
     Queue::shared_ptr defaultQueue;
     bool transactional;
@@ -86,30 +86,32 @@
     u_int32_t framesize;
     NameGenerator tagGenerator;
     std::list<DeliveryRecord> unacked;
-    qpid::sys::Mutex deliveryLock;
+    sys::Mutex deliveryLock;
     TxBuffer txBuffer;
     AccumulatedAck accumulatedAck;
     MessageStore* const store;
     MessageBuilder messageBuilder;//builder for in-progress message
     Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
-    qpid::framing::ProtocolVersion version; // version used for this channel
     bool opened;
 
+    boost::scoped_ptr<BrokerAdapter> adapter;
+
     virtual void complete(Message::shared_ptr& msg);
     void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);            
     void cancel(consumer_iterator consumer);
     bool checkPrefetch(Message::shared_ptr& msg);
         
   public:
-    Channel(
-        const qpid::framing::ProtocolVersion& _version,
-        qpid::framing::OutputHandler* out, int id, u_int32_t framesize, 
-        MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
+    Channel(Connection& channel,
+            framing::ChannelId id,
+            u_int32_t framesize, 
+            MessageStore* const _store = 0,
+            u_int64_t stagingThreshold = 0);
+    
     ~Channel();
+
     bool isOpen() const { return opened; }
-    const framing::ProtocolVersion& getVersion() const { return version; }
     void open() { opened = true; }
-    u_int16_t getId() const { return id; }
     void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
     Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
     u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
@@ -118,7 +120,7 @@
     bool exists(const string& consumerTag);
     void consume(string& tag, Queue::shared_ptr queue, bool acks,
                  bool exclusive, ConnectionToken* const connection = 0,
-                 const qpid::framing::FieldTable* = 0);
+                 const framing::FieldTable* = 0);
     void cancel(const string& tag);
     bool get(Queue::shared_ptr queue, bool ackExpected);
     void begin();
@@ -129,14 +131,18 @@
     void recover(bool requeue);
     void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);            
     void handlePublish(Message* msg, Exchange::shared_ptr exchange);
-    void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr);
-    void handleContent(qpid::framing::AMQContentBody::shared_ptr);
-    void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr);
+    void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
+    void handleContent(boost::shared_ptr<framing::AMQContentBody>);
+    void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+    void handleMethodInContext(
+        boost::shared_ptr<framing::AMQMethodBody> method,
+        const framing::MethodContext& context);
+    
 };
 
 struct InvalidAckException{};
 
-}} // namespace qpid::broker
+}} // namespace broker
 
 
 #endif  /*!_broker_BrokerChannel_h*/

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Fri Feb  2 14:03:10 2007
@@ -27,31 +27,35 @@
 #include <BasicDeliverBody.h>
 #include <BasicGetOkBody.h>
 #include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
 
 using namespace boost;
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-BasicMessage::BasicMessage(const ConnectionToken* const _publisher, 
-                 const string& _exchange, const string& _routingKey, 
-                 bool _mandatory, bool _immediate) :
-	Message(_exchange, _routingKey, _mandatory, _immediate),
-	publisher(_publisher),
-	size(0)
+BasicMessage::BasicMessage(
+    const ConnectionToken* const _publisher, 
+    const string& _exchange, const string& _routingKey, 
+    bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
+) :
+    Message(_exchange, _routingKey, _mandatory, _immediate, respondTo),
+    publisher(_publisher),
+    size(0)
 {
 }
 
-BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : 
-    publisher(0), size(0)
-{
+// FIXME aconway 2007-02-01: remove.
+// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : 
+//     publisher(0), size(0)
+// {
 
-    decode(buffer, headersOnly, contentChunkSize);
-}
+//     decode(buffer, headersOnly, contentChunkSize);
+// }
 
+// For tests only.
 BasicMessage::BasicMessage() : publisher(0), size(0)
-{
-}
+{}
 
 BasicMessage::~BasicMessage(){
     if (content.get()) content->destroy();
@@ -73,34 +77,42 @@
     return header.get() && (header->getContentSize() == contentSize());
 }
 
-void BasicMessage::deliver(OutputHandler* out, int channel, 
-                      const string& consumerTag, u_int64_t deliveryTag, 
-                      u_int32_t framesize,
-		      ProtocolVersion* version){
-    // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
-    out->send(new AMQFrame(*version, channel,
-    	new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey())));
-    sendContent(out, channel, framesize, version);
-}
-
-void BasicMessage::sendGetOk(OutputHandler* out, 
-         int channel, 
-         u_int32_t messageCount,
-         u_int64_t deliveryTag, 
-         u_int32_t framesize,
-	ProtocolVersion* version){
-    // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
-	out->send(new AMQFrame(*version, channel,
-     	new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount)));
-    sendContent(out, channel, framesize, version);
-}
-
-void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
-    AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
-    out->send(new AMQFrame(*version, channel, headerBody));
+void BasicMessage::deliver(ChannelAdapter& channel, 
+                           const string& consumerTag, u_int64_t deliveryTag, 
+                           u_int32_t framesize)
+{
+    // CCT -- TODO - Update code generator to take pointer/ not
+    // instance to avoid extra contruction
+    channel.send(
+    	new BasicDeliverBody(
+            channel.getVersion(), consumerTag, deliveryTag,
+            getRedelivered(), getExchange(), getRoutingKey()));
+    sendContent(channel, framesize);
+}
+
+void BasicMessage::sendGetOk(const MethodContext& context, 
+                             u_int32_t messageCount,
+                             u_int64_t deliveryTag, 
+                             u_int32_t framesize)
+{
+    // CCT -- TODO - Update code generator to take pointer/ not
+    // instance to avoid extra contruction
+    context.channel->send(
+        new BasicGetOkBody(
+            context.channel->getVersion(),
+            context.methodBody->getRequestId(),
+            deliveryTag, getRedelivered(), getExchange(),
+            getRoutingKey(), messageCount)); 
+    sendContent(*context.channel, framesize);
+}
 
+void BasicMessage::sendContent(
+    ChannelAdapter& channel, u_int32_t framesize)
+{
+    channel.send(header);
     Mutex::ScopedLock locker(contentLock);
-    if (content.get()) content->send(*version, out, channel, framesize);
+    if (content.get())
+        content->send(channel,  framesize);
 }
 
 BasicHeaderProperties* BasicMessage::getHeaderProperties(){
@@ -126,8 +138,8 @@
 
 void BasicMessage::decodeHeader(Buffer& buffer)
 {
-	string exchange;
-	string routingKey;
+    string exchange;
+    string routingKey;
 
     buffer.getShortString(exchange);
     buffer.getShortString(routingKey);

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h Fri Feb  2 14:03:10 2007
@@ -27,109 +27,111 @@
 #include <boost/shared_ptr.hpp>
 #include <AMQContentBody.h>
 #include <AMQHeaderBody.h>
-#include <ProtocolVersion.h>
+#include "AMQMethodBody.h"
 #include <BasicHeaderProperties.h>
 #include <ConnectionToken.h>
 #include <Content.h>
-#include <OutputHandler.h>
 #include <Mutex.h>
 #include <TxBuffer.h>
 
 namespace qpid {
-    namespace broker {
 
-        class MessageStore;
-        using qpid::framing::string;
+namespace framing {
+class MethodContext;
+class ChannelAdapter;
+}
+
+namespace broker {
+
+class MessageStore;
+using framing::string;
 	
-        /**
-         * Represents an AMQP message, i.e. a header body, a list of
-         * content bodies and some details about the publication
-         * request.
-         */
-        class BasicMessage : public Message{
-            const ConnectionToken* const publisher;
-            qpid::framing::AMQHeaderBody::shared_ptr header;
-            std::auto_ptr<Content> content;
-            qpid::sys::Mutex contentLock;
-            u_int64_t size;
-
-            void sendContent(qpid::framing::OutputHandler* out, 
-                             int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version);
-
-        public:
-            typedef boost::shared_ptr<BasicMessage> shared_ptr;
-
-            BasicMessage(const ConnectionToken* const publisher, 
-                    const string& exchange, const string& routingKey, 
-                    bool mandatory, bool immediate);
-            BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
-            BasicMessage();
-            ~BasicMessage();
-            void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
-            void addContent(qpid::framing::AMQContentBody::shared_ptr data);
-            bool isComplete();
-            const ConnectionToken* const getPublisher();
-
-            void deliver(qpid::framing::OutputHandler* out, 
-                         int channel, 
-                         const string& consumerTag, 
-                         u_int64_t deliveryTag, 
-                         u_int32_t framesize,
-			 qpid::framing::ProtocolVersion* version);
-            void sendGetOk(qpid::framing::OutputHandler* out, 
-                           int channel, 
-                           u_int32_t messageCount,
-                           u_int64_t deliveryTag, 
-                           u_int32_t framesize,
-			   qpid::framing::ProtocolVersion* version);
-
-            qpid::framing::BasicHeaderProperties* getHeaderProperties();
-            bool isPersistent();
-            u_int64_t contentSize() const { return size; }
-
-            void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
-            void decodeHeader(qpid::framing::Buffer& buffer);
-            void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
-
-            void encode(qpid::framing::Buffer& buffer);
-            void encodeHeader(qpid::framing::Buffer& buffer);
-            void encodeContent(qpid::framing::Buffer& buffer);
-            /**
-             * @returns the size of the buffer needed to encode this
-             * message in its entirety
-             */
-            u_int32_t encodedSize();
-            /**
-             * @returns the size of the buffer needed to encode the
-             * 'header' of this message (not just the header frame,
-             * but other meta data e.g.routing key and exchange)
-             */
-            u_int32_t encodedHeaderSize();
-            /**
-             * @returns the size of the buffer needed to encode the
-             * (possibly partial) content held by this message
-             */
-            u_int32_t encodedContentSize();
-            /**
-             * Releases the in-memory content data held by this
-             * message. Must pass in a store from which the data can
-             * be reloaded.
-             */
-            void releaseContent(MessageStore* store);
-            /**
-             * If headers have been received, returns the expected
-             * content size else returns 0.
-             */
-            u_int64_t expectedContentSize();
-            /**
-             * Sets the 'content' implementation of this message (the
-             * message controls the lifecycle of the content instance
-             * it uses).
-             */
-            void setContent(std::auto_ptr<Content>& content);
-        };
+/**
+ * Represents an AMQP message, i.e. a header body, a list of
+ * content bodies and some details about the publication
+ * request.
+ */
+class BasicMessage : public Message {
+    const ConnectionToken* const publisher;
+    framing::AMQHeaderBody::shared_ptr header;
+    std::auto_ptr<Content> content;
+    sys::Mutex contentLock;
+    u_int64_t size;
+
+    void sendContent(framing::ChannelAdapter&, u_int32_t framesize);
+
+  public:
+    typedef boost::shared_ptr<BasicMessage> shared_ptr;
+
+    BasicMessage(const ConnectionToken* const publisher, 
+                 const string& exchange, const string& routingKey, 
+                 bool mandatory, bool immediate,
+                 framing::AMQMethodBody::shared_ptr respondTo);
+    BasicMessage();
+    ~BasicMessage();
+    void setHeader(framing::AMQHeaderBody::shared_ptr header);
+    void addContent(framing::AMQContentBody::shared_ptr data);
+    bool isComplete();
+    const ConnectionToken* const getPublisher();
+
+    void deliver(framing::ChannelAdapter&, 
+                 const string& consumerTag, 
+                 u_int64_t deliveryTag, 
+                 u_int32_t framesize);
+    
+    void sendGetOk(const framing::MethodContext&, 
+                   u_int32_t messageCount,
+                   u_int64_t deliveryTag, 
+                   u_int32_t framesize);
+
+    framing::BasicHeaderProperties* getHeaderProperties();
+    bool isPersistent();
+    u_int64_t contentSize() const { return size; }
+
+    void decode(framing::Buffer& buffer, bool headersOnly = false,
+                u_int32_t contentChunkSize = 0);
+    void decodeHeader(framing::Buffer& buffer);
+    void decodeContent(framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
+
+    void encode(framing::Buffer& buffer);
+    void encodeHeader(framing::Buffer& buffer);
+    void encodeContent(framing::Buffer& buffer);
+    /**
+     * @returns the size of the buffer needed to encode this
+     * message in its entirety
+     */
+    u_int32_t encodedSize();
+    /**
+     * @returns the size of the buffer needed to encode the
+     * 'header' of this message (not just the header frame,
+     * but other meta data e.g.routing key and exchange)
+     */
+    u_int32_t encodedHeaderSize();
+    /**
+     * @returns the size of the buffer needed to encode the
+     * (possibly partial) content held by this message
+     */
+    u_int32_t encodedContentSize();
+    /**
+     * Releases the in-memory content data held by this
+     * message. Must pass in a store from which the data can
+     * be reloaded.
+     */
+    void releaseContent(MessageStore* store);
+    /**
+     * If headers have been received, returns the expected
+     * content size else returns 0.
+     */
+    u_int64_t expectedContentSize();
+    /**
+     * Sets the 'content' implementation of this message (the
+     * message controls the lifecycle of the content instance
+     * it uses).
+     */
+    void setContent(std::auto_ptr<Content>& content);
+};
 
-    }
+}
 }
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h Fri Feb  2 14:03:10 2007
@@ -24,146 +24,148 @@
 
 #include "AMQContentBody.h"
 #include "AMQHeaderBody.h"
+#include "AMQMethodBody.h"
 #include "Content.h"
+#include "framing/amqp_types.h"
 
 #include <string>
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
 	
-	namespace framing {
-		class OutputHandler;
-		class ProtocolVersion;
-		class BasicHeaderProperties;
-	}
+namespace framing {
+class MethodContext;
+class ChannelAdapter;
+class BasicHeaderProperties;
+}
 	
-    namespace broker {
+namespace broker {
 
-		class MessageStore;
-		class ConnectionToken;
+class MessageStore;
+class ConnectionToken;
 
-        /**
-         * Base class for all types of internal broker messages
-         * abstracting away the operations
-         * TODO; AMS: for the moment this is mostly a placeholder
-         */
-        class Message{
-            std::string exchange;
-            std::string routingKey;
-            const bool mandatory;
-            const bool immediate;
-            u_int64_t persistenceId;
-
-            bool redelivered;
-
-        public:
-            typedef boost::shared_ptr<Message> shared_ptr;
-
-            Message(const std::string& _exchange, const std::string& _routingKey, 
-                    bool _mandatory, bool _immediate) :
-            	exchange(_exchange),
-            	routingKey(_routingKey),
-            	mandatory(_mandatory),
-            	immediate(_immediate),
-            	persistenceId(0),
-            	redelivered(false)
-           {}
-            
-            Message() :
-            	mandatory(false),
-            	immediate(false),
-            	persistenceId(0),
-            	redelivered(false)
-            {}
-
-            virtual ~Message() {};
-            
-            // Accessors
-            const std::string& getRoutingKey() const { return routingKey; }
-            const std::string& getExchange() const { return exchange; }
-            u_int64_t getPersistenceId() const { return persistenceId; }
-            bool getRedelivered() const { return redelivered; }
-            
-            void setRouting(const std::string& _exchange, const std::string& _routingKey)
-            { exchange = _exchange; routingKey = _routingKey; } 
-            void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
-			void redeliver() { redelivered = true; }
-
-			/**
-			 * Used to deliver the message from the queue
-			 */
-            virtual void deliver(qpid::framing::OutputHandler* out, 
-                         int channel, 
+/**
+ * Base class for all types of internal broker messages
+ * abstracting away the operations
+ * TODO; AMS: for the moment this is mostly a placeholder
+ */
+class Message{
+    std::string exchange;
+    std::string routingKey;
+    const bool mandatory;
+    const bool immediate;
+    u_int64_t persistenceId;
+    bool redelivered;
+    framing::AMQMethodBody::shared_ptr respondTo;
+
+  public:
+    typedef boost::shared_ptr<Message> shared_ptr;
+
+    Message(const std::string& _exchange, const std::string& _routingKey, 
+            bool _mandatory, bool _immediate,
+            framing::AMQMethodBody::shared_ptr respondTo_) :
+        exchange(_exchange),
+        routingKey(_routingKey),
+        mandatory(_mandatory),
+        immediate(_immediate),
+        persistenceId(0),
+        redelivered(false),
+        respondTo(respondTo_)
+    {}
+            
+    Message() :
+        mandatory(false),
+        immediate(false),
+        persistenceId(0),
+        redelivered(false)
+    {}
+
+    virtual ~Message() {};
+            
+    // Accessors
+    const std::string& getRoutingKey() const { return routingKey; }
+    const std::string& getExchange() const { return exchange; }
+    u_int64_t getPersistenceId() const { return persistenceId; }
+    bool getRedelivered() const { return redelivered; }
+    framing::AMQMethodBody::shared_ptr getRespondTo() const {
+        return respondTo;
+    }
+    
+    void setRouting(const std::string& _exchange, const std::string& _routingKey)
+    { exchange = _exchange; routingKey = _routingKey; } 
+    void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
+    void redeliver() { redelivered = true; }
+
+    /**
+     * Used to deliver the message from the queue
+     */
+    virtual void deliver(framing::ChannelAdapter& channel,
                          const std::string& consumerTag, 
                          u_int64_t deliveryTag, 
-                         u_int32_t framesize,
-			 			 qpid::framing::ProtocolVersion* version) = 0;
-			/**
-			 * Used to return a message in response to a get from a queue
-			 */
-            virtual void sendGetOk(qpid::framing::OutputHandler* out, 
-                           int channel, 
+                         u_int32_t framesize) = 0;
+    /**
+     * Used to return a message in response to a get from a queue
+     */
+    virtual void sendGetOk(const framing::MethodContext& context,
                            u_int32_t messageCount,
                            u_int64_t deliveryTag, 
-                           u_int32_t framesize,
-			   			   qpid::framing::ProtocolVersion* version) = 0;
+                           u_int32_t framesize) = 0;
             
-            virtual bool isComplete() = 0;
+    virtual bool isComplete() = 0;
             
-            virtual u_int64_t contentSize() const = 0;
-            virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
-            virtual bool isPersistent() = 0;
-            virtual const ConnectionToken* const getPublisher() = 0;
-
-            virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
-            virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
-
-            /**
-             * @returns the size of the buffer needed to encode this
-             * message in its entirety
-             * 
-             * XXXX: Only used in tests?
-             */
-            virtual u_int32_t encodedSize() = 0;
-            /**
-             * @returns the size of the buffer needed to encode the
-             * 'header' of this message (not just the header frame,
-             * but other meta data e.g.routing key and exchange)
-             * 
-             * XXXX: Only used in tests?
-             */
-            virtual u_int32_t encodedHeaderSize() = 0;
-            /**
-             * @returns the size of the buffer needed to encode the
-             * (possibly partial) content held by this message
-             */
-            virtual u_int32_t encodedContentSize() = 0;
-            /**
-             * If headers have been received, returns the expected
-             * content size else returns 0.
-             */
-            virtual u_int64_t expectedContentSize() = 0;
-            
-            // TODO: AMS 29/1/2007 Don't think these are really part of base class
-            
-            /**
-             * Sets the 'content' implementation of this message (the
-             * message controls the lifecycle of the content instance
-             * it uses).
-             */
-            virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
-            virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
-            virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
-            /**
-             * Releases the in-memory content data held by this
-             * message. Must pass in a store from which the data can
-             * be reloaded.
-             */
-            virtual void releaseContent(MessageStore* /*store*/) {};
-        };
+    virtual u_int64_t contentSize() const = 0;
+    virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
+    virtual bool isPersistent() = 0;
+    virtual const ConnectionToken* const getPublisher() = 0;
+
+    virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+    virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+
+    /**
+     * @returns the size of the buffer needed to encode this
+     * message in its entirety
+     * 
+     * XXXX: Only used in tests?
+     */
+    virtual u_int32_t encodedSize() = 0;
+    /**
+     * @returns the size of the buffer needed to encode the
+     * 'header' of this message (not just the header frame,
+     * but other meta data e.g.routing key and exchange)
+     * 
+     * XXXX: Only used in tests?
+     */
+    virtual u_int32_t encodedHeaderSize() = 0;
+    /**
+     * @returns the size of the buffer needed to encode the
+     * (possibly partial) content held by this message
+     */
+    virtual u_int32_t encodedContentSize() = 0;
+    /**
+     * If headers have been received, returns the expected
+     * content size else returns 0.
+     */
+    virtual u_int64_t expectedContentSize() = 0;
+            
+    // TODO: AMS 29/1/2007 Don't think these are really part of base class
+            
+    /**
+     * Sets the 'content' implementation of this message (the
+     * message controls the lifecycle of the content instance
+     * it uses).
+     */
+    virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
+    virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {};
+    virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {};
+    /**
+     * Releases the in-memory content data held by this
+     * message. Must pass in a store from which the data can
+     * be reloaded.
+     */
+    virtual void releaseContent(MessageStore* /*store*/) {};
+};
 
-    }
-}
+}}
 
 
 #endif  /*!_broker_BrokerMessage_h*/

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Fri Feb  2 14:03:10 2007
@@ -22,29 +22,28 @@
 
 using namespace qpid::broker;
 	
-MessageMessage::MessageMessage(const qpid::framing::AMQMethodBody& _methodBody, 
-                 const std::string& _exchange, const std::string& _routingKey, 
-                 bool _mandatory, bool _immediate) :
-	Message(_exchange, _routingKey, _mandatory, _immediate),
-	methodBody(_methodBody)
+MessageMessage::MessageMessage(
+    const qpid::framing::AMQMethodBody::shared_ptr _methodBody, 
+    const std::string& _exchange, const std::string& _routingKey, 
+    bool _mandatory, bool _immediate) :
+    Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
+    methodBody(_methodBody)
 {
 }
 
-void MessageMessage::deliver(qpid::framing::OutputHandler* /*out*/, 
-	int /*channel*/, 
-	const std::string& /*consumerTag*/, 
-	u_int64_t /*deliveryTag*/, 
-	u_int32_t /*framesize*/,
-	qpid::framing::ProtocolVersion* /*version*/)
+void MessageMessage::deliver(
+    framing::ChannelAdapter& /*out*/, 
+    const std::string& /*consumerTag*/, 
+    u_int64_t /*deliveryTag*/, 
+    u_int32_t /*framesize*/)
 {
 }
 
-void MessageMessage::sendGetOk(qpid::framing::OutputHandler* /*out*/, 
-	int /*channel*/, 
-	u_int32_t /*messageCount*/,
-	u_int64_t /*deliveryTag*/, 
-	u_int32_t /*framesize*/,
-	qpid::framing::ProtocolVersion* /*version*/)
+void MessageMessage::sendGetOk(
+    const framing::MethodContext& /*context*/, 
+    u_int32_t /*messageCount*/,
+    u_int64_t /*deliveryTag*/, 
+    u_int32_t /*framesize*/)
 {
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Fri Feb  2 14:03:10 2007
@@ -25,47 +25,46 @@
 #include "BrokerMessageBase.h"
 
 namespace qpid {
-	namespace framing {
-		class AMQMethodBody;
-	}
+namespace framing {
+class AMQMethodBody;
+}
 	
-    namespace broker {
-        class MessageMessage: public Message{
-        	const qpid::framing::AMQMethodBody& methodBody;
+namespace broker {
+class MessageMessage: public Message{
+    const qpid::framing::AMQMethodBody::shared_ptr methodBody;
 
-        public:
-            MessageMessage(const qpid::framing::AMQMethodBody& methodBody, 
-            	const std::string& exchange, const std::string& routingKey, 
-            	bool mandatory, bool immediate);
+  public:
+    MessageMessage(
+        const framing::AMQMethodBody::shared_ptr methodBody, 
+        const std::string& exchange, const std::string& routingKey, 
+        bool mandatory, bool immediate);
             
-			// Default destructor okay
+    // Default destructor okay
 			            
-            void deliver(qpid::framing::OutputHandler* out, 
-                         int channel, 
-                         const std::string& consumerTag, 
-                         u_int64_t deliveryTag, 
-                         u_int32_t framesize,
-			 			 qpid::framing::ProtocolVersion* version);
-            void sendGetOk(qpid::framing::OutputHandler* out, 
-                           int channel, 
-                           u_int32_t messageCount,
-                           u_int64_t deliveryTag, 
-                           u_int32_t framesize,
-			   			   qpid::framing::ProtocolVersion* version);
-            bool isComplete();
+    void deliver(framing::ChannelAdapter& channel, 
+                 const std::string& consumerTag, 
+                 u_int64_t deliveryTag, 
+                 u_int32_t framesize);
+    
+    void sendGetOk(const framing::MethodContext& context, 
+                   u_int32_t messageCount,
+                   u_int64_t deliveryTag, 
+                   u_int32_t framesize);
+
+    bool isComplete();
             
-            u_int64_t contentSize() const;
-            qpid::framing::BasicHeaderProperties* getHeaderProperties();
-            bool isPersistent();
-            const ConnectionToken* const getPublisher();
+    u_int64_t contentSize() const;
+    qpid::framing::BasicHeaderProperties* getHeaderProperties();
+    bool isPersistent();
+    const ConnectionToken* const getPublisher();
             
-            u_int32_t encodedSize();
-            u_int32_t encodedHeaderSize();
-            u_int32_t encodedContentSize();
-            u_int64_t expectedContentSize();
-        };
+    u_int32_t encodedSize();
+    u_int32_t encodedHeaderSize();
+    u_int32_t encodedContentSize();
+    u_int64_t expectedContentSize();
+};
 
-    }
+}
 }
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Fri Feb  2 14:03:10 2007
@@ -60,9 +60,11 @@
 
 
 void Connection::received(qpid::framing::AMQFrame* frame){
-    getAdapter(frame->getChannel()).handleBody(frame->getBody());
+    getChannel(frame->getChannel()).handleBody(frame->getBody());
 }
 
+// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter
+// as it is part of the protocol.
 void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
     if (client.get())
         // TODO aconway 2007-01-16: correct error code.
@@ -72,12 +74,11 @@
     FieldTable properties;
     string mechanisms("PLAIN");
     string locales("en_US");
-    // TODO aconway 2007-01-16: Client call, move to adapter.
     client->getConnection().start(
-        MethodContext(0, &getAdapter(0)),
+        MethodContext(&getChannel(0)),
         header->getMajor(), header->getMinor(),
         properties, mechanisms, locales);
-    getAdapter(0).init(0, *out, client->getProtocolVersion());
+    getChannel(0).init(0, *out, client->getProtocolVersion());
 }
 
 void Connection::idleOut(){}
@@ -99,28 +100,19 @@
 
 void Connection::closeChannel(u_int16_t channel) {
     getChannel(channel).close(); 
-    adapters.erase(adapters.find(channel));
+    channels.erase(channels.find(channel));
 }
 
 
-BrokerAdapter& Connection::getAdapter(u_int16_t id) { 
-    AdapterMap::iterator i = adapters.find(id);
-    if (i == adapters.end()) {
-        std::auto_ptr<Channel> ch(
-            new Channel(
-                client->getProtocolVersion(), out, id,
-                framemax, broker.getQueues().getStore(),
-                settings.stagingThreshold));
-        BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
-        adapters.insert(id, adapter);
-        return *adapter;
-    }
-    else
-        return *i;
-}
-
-Channel& Connection::getChannel(u_int16_t id) {
-    return getAdapter(id).getChannel();
+Channel& Connection::getChannel(ChannelId id) {
+    ChannelMap::iterator i = channels.find(id);
+    if (i == channels.end()) {
+        i = channels.insert(
+            id, new Channel(
+                *this, id, framemax, broker.getQueues().getStore(),
+                settings.stagingThreshold)).first;
+    }        
+    return *i;
 }
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h Fri Feb  2 14:03:10 2007
@@ -34,7 +34,6 @@
 #include <sys/TimeoutHandler.h>
 #include "Broker.h"
 #include "Exception.h"
-#include "BrokerAdapter.h"
 
 namespace qpid {
 namespace broker {
@@ -47,19 +46,22 @@
     Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
 };
 
-class Connection : public qpid::sys::ConnectionInputHandler, 
+class Connection : public sys::ConnectionInputHandler, 
                    public ConnectionToken
 {
   public:
-    Connection(qpid::sys::ConnectionOutputHandler* out, Broker& broker);
+    Connection(sys::ConnectionOutputHandler* out, Broker& broker);
     // ConnectionInputHandler methods
-    void received(qpid::framing::AMQFrame* frame);
-    void initiated(qpid::framing::ProtocolInitiation* header);
+    void received(framing::AMQFrame* frame);
+    void initiated(framing::ProtocolInitiation* header);
     void idleOut();
     void idleIn();
     void closed();
 
-    qpid::sys::ConnectionOutputHandler& getOutput() { return *out; }
+    sys::ConnectionOutputHandler& getOutput() { return *out; }
+
+    const framing::ProtocolVersion& getVersion() {
+        return client->getProtocolVersion(); }
 
     u_int32_t getFrameMax() const { return framemax; }
     u_int16_t getHeartbeat() const { return heartbeat; }
@@ -68,7 +70,7 @@
     void setHeartbeat(u_int16_t hb) { heartbeat = hb; }
 
     Broker& broker;
-    std::auto_ptr<qpid::framing::AMQP_ClientProxy> client;
+    std::auto_ptr<framing::AMQP_ClientProxy> client;
     Settings settings;
 
     std::vector<Queue::shared_ptr> exclusiveQueues;
@@ -81,20 +83,18 @@
      */
     Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
 
-    Channel& newChannel(u_int16_t channel);
-    Channel& getChannel(u_int16_t channel);
-    void closeChannel(u_int16_t channel);
+    Channel& newChannel(framing::ChannelId channel);
+    Channel& getChannel(framing::ChannelId channel);
+    void closeChannel(framing::ChannelId channel);
 
   private:
-    typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
+    typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
 
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
     Exchange::shared_ptr findExchange(const string& name);
 
-    BrokerAdapter& getAdapter(u_int16_t id);
-    
-    AdapterMap adapters;
-    qpid::sys::ConnectionOutputHandler* out;
+    ChannelMap channels;
+    sys::ConnectionOutputHandler* out;
     u_int32_t framemax;
     u_int16_t heartbeat;
 };

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h Fri Feb  2 14:03:10 2007
@@ -24,21 +24,25 @@
 #include <AMQContentBody.h>
 #include <Buffer.h>
 #include <OutputHandler.h>
-#include <ProtocolVersion.h>
 
 namespace qpid {
-    namespace broker {
-        class Content{
-        public:
-            virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
-            virtual u_int32_t size() = 0;
-            virtual void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
-            virtual void encode(qpid::framing::Buffer& buffer) = 0;
-            virtual void destroy() = 0;
-            virtual ~Content(){}
-        };
-    }
+
+namespace framing {
+class ChannelAdapter;
 }
+
+namespace broker {
+class Content{
+  public:
+    virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
+    virtual u_int32_t size() = 0;
+    virtual void send(framing::ChannelAdapter& channel,
+                      u_int32_t framesize) = 0;
+    virtual void encode(qpid::framing::Buffer& buffer) = 0;
+    virtual void destroy() = 0;
+    virtual ~Content(){}
+};
+}}
 
 
 #endif

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp Fri Feb  2 14:03:10 2007
@@ -20,6 +20,7 @@
  */
 #include <InMemoryContent.h>
 #include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
@@ -39,24 +40,26 @@
     return sum;
 }
 
-void InMemoryContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+// FIXME aconway 2007-02-01: Remove version parameter.
+void InMemoryContent::send(ChannelAdapter& channel, u_int32_t framesize)
 {
     for (content_iterator i = content.begin(); i != content.end(); i++) {
         if ((*i)->size() > framesize) {
             u_int32_t offset = 0;
             for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
                 string data = (*i)->getData().substr(offset, framesize);
-                out->send(new AMQFrame(version, channel, new AMQContentBody(data)));                
+                channel.send(new AMQContentBody(data)); 
                 offset += framesize;
             }
             u_int32_t remainder = (*i)->size() % framesize;
             if (remainder) {
                 string data = (*i)->getData().substr(offset, remainder);
-                out->send(new AMQFrame(version, channel, new AMQContentBody(data)));                
+                channel.send(new AMQContentBody(data)); 
             }
         } else {
-            AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
-            out->send(new AMQFrame(version, channel, contentBody));
+            AMQBody::shared_ptr contentBody =
+                static_pointer_cast<AMQBody, AMQContentBody>(*i);
+            channel.send(contentBody);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h Fri Feb  2 14:03:10 2007
@@ -35,7 +35,7 @@
         public:
             void add(qpid::framing::AMQContentBody::shared_ptr data);
             u_int32_t size();
-            void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+            void send(framing::ChannelAdapter&, u_int32_t framesize);
             void encode(qpid::framing::Buffer& buffer);
             void destroy();
             ~InMemoryContent(){}