You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/18 07:27:52 UTC

svn commit: r497319 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/ lib/client/ lib/common/framing/ tests/

Author: aconway
Date: Wed Jan 17 22:27:50 2007
New Revision: 497319

URL: http://svn.apache.org/viewvc?view=rev&rev=497319
Log:
There are a ton of FIXMES and request/response IDs are not yet working fully
but all tests are passing. 
* broker::Broker: Removed requester/responder from broker.
* framing::BodyHandler: added Requester/Responder to BodyHandler, becomes
  the base class for channel adapters in broker and client.
* broker::BrokerAdapter: Inherit BodyHandler, wraps a broker::Channel.
  Hide private *HandlerImpl detail classes in BodyHandler.cpp.
* broker::Connection: Requester/Responder/Adapter now per-channel.
  Connection channel map replaced with adapter map of BrokerAdapters.
  handle* functions moved to BrokerAdapter.
  All methods now handled by a BrokerAdapter for the relevant channel.
  ChannelHandlerImpl is repsonsible for checking that
  - No method on a non-0 channel is processed before open()
  - Channel 0 methods only happen on channel 0 and similar for non-zero methods
  Checks are not yet complete (see FIXMES)
* client::ResponseHandler: fix for client hang if broker crashs.

Removed:
    incubator/qpid/branches/qpid.0-9/cpp/tests/BodyHandlerTest.cpp
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h Wed Jan 17 22:27:50 2007
@@ -29,8 +29,6 @@
 #include <SharedObject.h>
 #include <MessageStore.h>
 #include <AutoDelete.h>
-#include "Requester.h"
-#include "Responder.h"
 #include <ExchangeRegistry.h>
 #include <BrokerChannel.h>
 #include <ConnectionToken.h>
@@ -86,8 +84,6 @@
     u_int32_t getTimeout() { return timeout; }
     u_int64_t getStagingThreshold() { return stagingThreshold; }
     AutoDelete& getCleaner() { return cleaner; }
-    qpid::framing::Requester& getRequester() { return requester; }
-    qpid::framing::Responder& getResponder() { return responder; }
     
   private:
     Broker(const Configuration& config); 
@@ -100,8 +96,6 @@
     u_int64_t stagingThreshold;
     AutoDelete cleaner;
     ConnectionFactory factory;
-    qpid::framing::Requester requester;
-    qpid::framing::Responder responder;
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Wed Jan 17 22:27:50 2007
@@ -15,10 +15,11 @@
  * limitations under the License.
  *
  */
-
 #include "BrokerAdapter.h"
 #include "Connection.h"
 #include "Exception.h"
+#include "AMQMethodBody.h"
+#include "Exception.h"
 
 namespace qpid {
 namespace broker {
@@ -28,75 +29,263 @@
 
 typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
-BrokerAdapter::BrokerAdapter(Connection& c) :
-    connection(c),
-    basicHandler(c),
-    channelHandler(c),
-    connectionHandler(c),
-    exchangeHandler(c),
-    messageHandler(c),
-    queueHandler(c),
-    txHandler(c)    
-{}
-
-typedef qpid::framing::AMQP_ServerOperations Ops;
+class BrokerAdapter::ServerOps : public AMQP_ServerOperations
+{
+  public:
+    ServerOps(Channel& ch, Connection& c, Broker& b) :
+        basicHandler(ch, c, b),
+        channelHandler(ch, c, b),
+        connectionHandler(ch, c, b),
+        exchangeHandler(ch, c, b),
+        messageHandler(ch, c, b),
+        queueHandler(ch, c, b),
+        txHandler(ch, c, b)    
+    {}
+    
+    ChannelHandler* getChannelHandler() { return &channelHandler; }
+    ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
+    BasicHandler* getBasicHandler() { return &basicHandler; }
+    ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+    QueueHandler* getQueueHandler() { return &queueHandler; }
+    TxHandler* getTxHandler() { return &txHandler;  }
+    MessageHandler* getMessageHandler() { return &messageHandler;  }
+    AccessHandler* getAccessHandler() {
+        throw ConnectionException(540, "Access class not implemented");  }
+    FileHandler* getFileHandler() {
+        throw ConnectionException(540, "File class not implemented");  }
+    StreamHandler* getStreamHandler() {
+        throw ConnectionException(540, "Stream class not implemented");  }
+    DtxHandler* getDtxHandler() {
+        throw ConnectionException(540, "Dtx class not implemented");  }
+    TunnelHandler* getTunnelHandler() {
+        throw ConnectionException(540, "Tunnel class not implemented"); }
+
+  private:
+    struct CoreRefs {
+        CoreRefs(Channel& ch, Connection& c, Broker& b)
+            : channel(ch), connection(c), broker(b) {}
+
+        Channel& channel;
+        Connection& connection;
+        Broker& broker;
+    };
+    
+    class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
+      public:
+        ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+
+        void startOk(u_int16_t channel,
+                     const qpid::framing::FieldTable& clientProperties,
+                     const std::string& mechanism, const std::string& response,
+                     const std::string& locale); 
+        void secureOk(u_int16_t channel, const std::string& response); 
+        void tuneOk(u_int16_t channel, u_int16_t channelMax,
+                    u_int32_t frameMax, u_int16_t heartbeat); 
+        void open(u_int16_t channel, const std::string& virtualHost,
+                  const std::string& capabilities, bool insist); 
+        void close(u_int16_t channel, u_int16_t replyCode,
+                   const std::string& replyText,
+                   u_int16_t classId, u_int16_t methodId); 
+        void closeOk(u_int16_t channel); 
+    };
+
+    class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
+      public:
+        ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void open(u_int16_t channel, const std::string& outOfBand); 
+        void flow(u_int16_t channel, bool active); 
+        void flowOk(u_int16_t channel, bool active); 
+        void ok( u_int16_t channel );
+        void ping( u_int16_t channel );
+        void pong( u_int16_t channel );
+        void resume( u_int16_t channel, const std::string& channelId );
+        void close(u_int16_t channel, u_int16_t replyCode, const
+                   std::string& replyText, u_int16_t classId, u_int16_t methodId); 
+        void closeOk(u_int16_t channel); 
+    };
+    
+    class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
+      public:
+        ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void declare(u_int16_t channel, u_int16_t ticket,
+                     const std::string& exchange, const std::string& type, 
+                     bool passive, bool durable, bool autoDelete,
+                     bool internal, bool nowait, 
+                     const qpid::framing::FieldTable& arguments); 
+        void delete_(u_int16_t channel, u_int16_t ticket,
+                     const std::string& exchange, bool ifUnused, bool nowait); 
+        void unbind(u_int16_t channel,
+                    u_int16_t ticket, const std::string& queue,
+                    const std::string& exchange, const std::string& routingKey,
+                    const qpid::framing::FieldTable& arguments );
+    };
+
+    class QueueHandlerImpl : private CoreRefs, public QueueHandler{
+      public:
+        QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
+                     bool passive, bool durable, bool exclusive, 
+                     bool autoDelete, bool nowait,
+                     const qpid::framing::FieldTable& arguments); 
+        void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
+                  const std::string& exchange, const std::string& routingKey,
+                  bool nowait, const qpid::framing::FieldTable& arguments); 
+        void unbind(u_int16_t channel,
+                    u_int16_t ticket,
+                    const std::string& queue,
+                    const std::string& exchange,
+                    const std::string& routingKey,
+                    const qpid::framing::FieldTable& arguments );
+        void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
+                   bool nowait); 
+        void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+                     bool ifUnused, bool ifEmpty, 
+                     bool nowait);
+    };
+
+    class BasicHandlerImpl : private CoreRefs, public BasicHandler{
+      public:
+        BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void qos(u_int16_t channel, u_int32_t prefetchSize,
+                 u_int16_t prefetchCount, bool global); 
+        void consume(
+            u_int16_t channel, u_int16_t ticket, const std::string& queue,
+            const std::string& consumerTag, bool noLocal, bool noAck,
+            bool exclusive, bool nowait,
+            const qpid::framing::FieldTable& fields); 
+        void cancel(u_int16_t channel, const std::string& consumerTag,
+                    bool nowait); 
+        void publish(u_int16_t channel, u_int16_t ticket,
+                     const std::string& exchange, const std::string& routingKey, 
+                     bool mandatory, bool immediate); 
+        void get(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+                 bool noAck); 
+        void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); 
+        void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); 
+        void recover(u_int16_t channel, bool requeue); 
+    };
+
+    class TxHandlerImpl : private CoreRefs, public TxHandler{
+      public:
+        TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+        void select(u_int16_t channel);
+        void commit(u_int16_t channel);
+        void rollback(u_int16_t channel);
+    };
+
+    class MessageHandlerImpl : private CoreRefs, public MessageHandler {
+      public:
+        MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+
+        void append( u_int16_t channel,
+                     const std::string& reference,
+                     const std::string& bytes );
+
+        void cancel( u_int16_t channel,
+                     const std::string& destination );
+
+        void checkpoint( u_int16_t channel,
+                         const std::string& reference,
+                         const std::string& identifier );
+
+        void close( u_int16_t channel,
+                    const std::string& reference );
+
+        void consume( u_int16_t channel,
+                      u_int16_t ticket,
+                      const std::string& queue,
+                      const std::string& destination,
+                      bool noLocal,
+                      bool noAck,
+                      bool exclusive,
+                      const qpid::framing::FieldTable& filter );
+
+        void empty( u_int16_t channel );
+
+        void get( u_int16_t channel,
+                  u_int16_t ticket,
+                  const std::string& queue,
+                  const std::string& destination,
+                  bool noAck );
+
+        void offset( u_int16_t channel,
+                     u_int64_t value );
+
+        void ok( u_int16_t channel );
+
+        void open( u_int16_t channel,
+                   const std::string& reference );
+
+        void qos( u_int16_t channel,
+                  u_int32_t prefetchSize,
+                  u_int16_t prefetchCount,
+                  bool global );
+
+        void recover( u_int16_t channel,
+                      bool requeue );
+
+        void reject( u_int16_t channel,
+                     u_int16_t code,
+                     const std::string& text );
+
+        void resume( u_int16_t channel,
+                     const std::string& reference,
+                     const std::string& identifier );
+
+        void transfer( u_int16_t channel,
+                       u_int16_t ticket,
+                       const std::string& destination,
+                       bool redelivered,
+                       bool immediate,
+                       u_int64_t ttl,
+                       u_int8_t priority,
+                       u_int64_t timestamp,
+                       u_int8_t deliveryMode,
+                       u_int64_t expiration,
+                       const std::string& exchange,
+                       const std::string& routingKey,
+                       const std::string& messageId,
+                       const std::string& correlationId,
+                       const std::string& replyTo,
+                       const std::string& contentType,
+                       const std::string& contentEncoding,
+                       const std::string& userId,
+                       const std::string& appId,
+                       const std::string& transactionId,
+                       const std::string& securityToken,
+                       const qpid::framing::FieldTable& applicationHeaders,
+                       qpid::framing::Content body );
+    };
+
+    BasicHandlerImpl basicHandler;
+    ChannelHandlerImpl channelHandler;
+    ConnectionHandlerImpl connectionHandler;
+    ExchangeHandlerImpl exchangeHandler;
+    MessageHandlerImpl messageHandler;
+    QueueHandlerImpl queueHandler;
+    TxHandlerImpl txHandler;
 
-Ops::ChannelHandler* BrokerAdapter::getChannelHandler() {
-    return &channelHandler;
-}
-Ops::ConnectionHandler* BrokerAdapter::getConnectionHandler() {
-    return &connectionHandler;
-}
-Ops::BasicHandler* BrokerAdapter::getBasicHandler() {
-    return &basicHandler;
-}
-Ops::ExchangeHandler* BrokerAdapter::getExchangeHandler() {
-    return &exchangeHandler;
-}
-Ops::QueueHandler* BrokerAdapter::getQueueHandler() {
-    return &queueHandler;
-}
-Ops::TxHandler* BrokerAdapter::getTxHandler() {
-    return &txHandler; 
-}
-Ops::MessageHandler* BrokerAdapter::getMessageHandler() {
-    return &messageHandler; 
-}
-Ops::AccessHandler* BrokerAdapter::getAccessHandler() {
-    throw ConnectionException(540, "Access class not implemented"); 
-}
-Ops::FileHandler* BrokerAdapter::getFileHandler() {
-    throw ConnectionException(540, "File class not implemented"); 
-}
-Ops::StreamHandler* BrokerAdapter::getStreamHandler() {
-    throw ConnectionException(540, "Stream class not implemented"); 
-}
-Ops::DtxHandler* BrokerAdapter::getDtxHandler() {
-    throw ConnectionException(540, "Dtx class not implemented"); 
-}
-Ops::TunnelHandler* BrokerAdapter::getTunnelHandler() {
-    throw ConnectionException(540, "Tunnel class not implemented");
-}
+};
 
-void BrokerAdapter::ConnectionHandlerImpl::startOk(
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
     u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, 
     const string& /*response*/, const string& /*locale*/){
     connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat);
 }
         
-void BrokerAdapter::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
         
-void BrokerAdapter::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
     connection.framemax = framemax;
     connection.heartbeat = heartbeat;
 }
         
-void BrokerAdapter::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
     string knownhosts;
     connection.client->getConnection().openOk(0, knownhosts);
 }
         
-void BrokerAdapter::ConnectionHandlerImpl::close(
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
     u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, 
     u_int16_t /*classId*/, u_int16_t /*methodId*/)
 {
@@ -104,47 +293,55 @@
     connection.context->close();
 } 
         
-void BrokerAdapter::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
     connection.context->close();
 } 
               
-void BrokerAdapter::ChannelHandlerImpl::open(
-    u_int16_t channel, const string& /*outOfBand*/){
-    connection.openChannel(channel);
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
+    u_int16_t channelId, const string& /*outOfBand*/){
+    // FIXME aconway 2007-01-17: Assertions on all channel methods,
+    // Drop channelId param.
+    assertChannelNonZero(channel.getId());
+    if (channel.isOpen())
+        throw ConnectionException(504, "Channel already open");
+    channel.open();
     // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
-    connection.client->getChannel().openOk(channel, std::string()/* ID */);
+    connection.client->getChannel().openOk(channelId, std::string()/* ID */);
 } 
         
-void BrokerAdapter::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}         
-void BrokerAdapter::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} 
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}         
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} 
         
-void BrokerAdapter::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, 
-                                                   u_int16_t /*classId*/, u_int16_t /*methodId*/){
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, 
+                                                         u_int16_t /*classId*/, u_int16_t /*methodId*/){
     connection.closeChannel(channel);
     connection.client->getChannel().closeOk(channel);
 } 
         
-void BrokerAdapter::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} 
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} 
               
 
 
-void BrokerAdapter::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, 
-                                                      bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
-                                                      const FieldTable& /*arguments*/){
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, 
+                                                            bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
+                                                            const FieldTable& /*arguments*/){
 
     if(passive){
-        if(!connection.broker.getExchanges().get(exchange)){
-            throw ChannelException(404, "Exchange not found: " + exchange);            
+        if(!broker.getExchanges().get(exchange)) {
+            throw ChannelException(404, "Exchange not found: " + exchange);
         }
     }else{        
         try{
-            std::pair<Exchange::shared_ptr, bool> response = connection.broker.getExchanges().declare(exchange, type);
+            std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
             if(!response.second && response.first->getType() != type){
-                throw ConnectionException(507, "Exchange already declared to be of type " 
-                                          + response.first->getType() + ", requested " + type);
+                throw ConnectionException(
+                    507,
+                    "Exchange already declared to be of type "
+                    + response.first->getType() + ", requested " + type);
             }
         }catch(UnknownExchangeTypeException& e){
-            throw ConnectionException(503, "Exchange type not implemented: " + type);
+            throw ConnectionException(
+                503, "Exchange type not implemented: " + type);
         }
     }
     if(!nowait){
@@ -153,7 +350,7 @@
 }
 
                 
-void BrokerAdapter::ExchangeHandlerImpl::unbind(
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind(
     u_int16_t /*channel*/,
     u_int16_t /*ticket*/,
     const string& /*queue*/,
@@ -166,23 +363,23 @@
 
 
                 
-void BrokerAdapter::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, 
-                                                      const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, 
+                                                            const string& exchange, bool /*ifUnused*/, bool nowait){
 
     //TODO: implement unused
-    connection.broker.getExchanges().destroy(exchange);
+    broker.getExchanges().destroy(exchange);
     if(!nowait) connection.client->getExchange().deleteOk(channel);
 } 
 
-void BrokerAdapter::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, 
-                                                   bool passive, bool durable, bool exclusive, 
-                                                   bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, 
+                                                         bool passive, bool durable, bool exclusive, 
+                                                         bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
 	queue = connection.getQueue(name, channel);
     } else {
 	std::pair<Queue::shared_ptr, bool> queue_created =  
-            connection.broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
+            broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
 	queue = queue_created.first;
 	assert(queue);
 	if (queue_created.second) { // This is a new queue
@@ -192,11 +389,11 @@
             queue_created.first->create(arguments);
 
 	    //add default binding:
-	    connection.broker.getExchanges().getDefault()->bind(queue, name, 0);
+	    broker.getExchanges().getDefault()->bind(queue, name, 0);
 	    if (exclusive) {
 		connection.exclusiveQueues.push_back(queue);
 	    } else if(autoDelete){
-		connection.broker.getCleaner().add(queue);
+		broker.getCleaner().add(queue);
 	    }
 	}
     }
@@ -209,12 +406,12 @@
     }
 } 
         
-void BrokerAdapter::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, 
-                                                const string& exchangeName, const string& routingKey, bool nowait, 
-                                                const FieldTable& arguments){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, 
+                                                      const string& exchangeName, const string& routingKey, bool nowait, 
+                                                      const FieldTable& arguments){
 
     Queue::shared_ptr queue = connection.getQueue(queueName, channel);
-    Exchange::shared_ptr exchange = connection.broker.getExchanges().get(exchangeName);
+    Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
     if(exchange){
         // kpvdr - cannot use this any longer as routingKey is now const
         //        if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
@@ -223,19 +420,20 @@
         exchange->bind(queue, exchangeRoutingKey, &arguments);
         if(!nowait) connection.client->getQueue().bindOk(channel);    
     }else{
-        throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+        throw ChannelException(
+            404, "Bind failed. No such exchange: " + exchangeName);
     }
 } 
         
-void BrokerAdapter::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
 
     Queue::shared_ptr queue = connection.getQueue(queueName, channel);
     int count = queue->purge();
     if(!nowait) connection.client->getQueue().purgeOk(channel, count);
 } 
         
-void BrokerAdapter::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, 
-                                                   bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, 
+                                                         bool ifUnused, bool ifEmpty, bool nowait){
     ChannelException error(0, "");
     int count(0);
     Queue::shared_ptr q = connection.getQueue(queue, channel);
@@ -251,7 +449,7 @@
         }
         count = q->getMessageCount();
         q->destroy();
-        connection.broker.getQueues().destroy(queue);
+        broker.getQueues().destroy(queue);
     }
 
     if(!nowait) connection.client->getQueue().deleteOk(channel, count);
@@ -260,14 +458,14 @@
         
 
 
-void BrokerAdapter::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
     //TODO: handle global
     connection.getChannel(channel).setPrefetchSize(prefetchSize);
     connection.getChannel(channel).setPrefetchCount(prefetchCount);
     connection.client->getBasic().qosOk(channel);
 } 
         
-void BrokerAdapter::BasicHandlerImpl::consume(
+void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
     u_int16_t channelId, u_int16_t /*ticket*/, 
     const string& queueName, const string& consumerTag, 
     bool noLocal, bool noAck, bool exclusive, 
@@ -296,26 +494,27 @@
 
 } 
         
-void BrokerAdapter::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
     connection.getChannel(channel).cancel(consumerTag);
 
     if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag);
 } 
         
-void BrokerAdapter::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, 
-                                                   const string& exchangeName, const string& routingKey, 
-                                                   bool mandatory, bool immediate){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, 
+                                                         const string& exchangeName, const string& routingKey, 
+                                                         bool mandatory, bool immediate){
 
-    Exchange::shared_ptr exchange = exchangeName.empty() ? connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName);
+    Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
     if(exchange){
         Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate);
         connection.getChannel(channel).handlePublish(msg, exchange);
     }else{
-        throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+        throw ChannelException(
+            404, "Exchange not found '" + exchangeName + "'");
     }
 } 
         
-void BrokerAdapter::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
     Queue::shared_ptr queue = connection.getQueue(queueName, channelId);    
     if(!connection.getChannel(channelId).get(queue, !noAck)){
         string clusterId;//not used, part of an imatix hack
@@ -324,7 +523,7 @@
     }
 } 
         
-void BrokerAdapter::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
     try{
         connection.getChannel(channel).ack(deliveryTag, multiple);
     }catch(InvalidAckException& e){
@@ -332,23 +531,23 @@
     }
 } 
         
-void BrokerAdapter::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} 
+void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} 
         
-void BrokerAdapter::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
     connection.getChannel(channel).recover(requeue);
 } 
 
-void BrokerAdapter::TxHandlerImpl::select(u_int16_t channel){
+void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t channel){
     connection.getChannel(channel).begin();
     connection.client->getTx().selectOk(channel);
 }
 
-void BrokerAdapter::TxHandlerImpl::commit(u_int16_t channel){
+void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t channel){
     connection.getChannel(channel).commit();
     connection.client->getTx().commitOk(channel);
 }
 
-void BrokerAdapter::TxHandlerImpl::rollback(u_int16_t channel){
+void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t channel){
     
     connection.getChannel(channel).rollback();
     connection.client->getTx().rollbackOk(channel);
@@ -356,7 +555,7 @@
 }
               
 void
-BrokerAdapter::QueueHandlerImpl::unbind(
+BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
     u_int16_t /*channel*/,
     u_int16_t /*ticket*/,
     const string& /*queue*/,
@@ -368,25 +567,25 @@
 }
 
 void
-BrokerAdapter::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
 {
     assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 void
-BrokerAdapter::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
 {
     assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 void
-BrokerAdapter::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
 {
     assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 void
-BrokerAdapter::ChannelHandlerImpl::resume(
+BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
     u_int16_t /*channel*/,
     const string& /*channelId*/ )
 {
@@ -395,143 +594,191 @@
 
 // Message class method handlers
 void
-BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/,
-                                                const string& /*reference*/,
-                                                const string& /*bytes*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::append( u_int16_t /*channel*/,
+                                                      const string& /*reference*/,
+                                                      const string& /*bytes*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 
 void
-BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
-                                                const string& /*destination*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
+                                                      const string& /*destination*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
-                                                    const string& /*reference*/,
-                                                    const string& /*identifier*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
+                                                          const string& /*reference*/,
+                                                          const string& /*identifier*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/,
-                                               const string& /*reference*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::close( u_int16_t /*channel*/,
+                                                     const string& /*reference*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::consume( u_int16_t /*channel*/,
-                                                 u_int16_t /*ticket*/,
-                                                 const string& /*queue*/,
-                                                 const string& /*destination*/,
-                                                 bool /*noLocal*/,
-                                                 bool /*noAck*/,
-                                                 bool /*exclusive*/,
-                                                 const qpid::framing::FieldTable& /*filter*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::consume( u_int16_t /*channel*/,
+                                                       u_int16_t /*ticket*/,
+                                                       const string& /*queue*/,
+                                                       const string& /*destination*/,
+                                                       bool /*noLocal*/,
+                                                       bool /*noAck*/,
+                                                       bool /*exclusive*/,
+                                                       const qpid::framing::FieldTable& /*filter*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::get( u_int16_t /*channel*/,
-                                             u_int16_t /*ticket*/,
-                                             const string& /*queue*/,
-                                             const string& /*destination*/,
-                                             bool /*noAck*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::get( u_int16_t /*channel*/,
+                                                   u_int16_t /*ticket*/,
+                                                   const string& /*queue*/,
+                                                   const string& /*destination*/,
+                                                   bool /*noAck*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
-                                                u_int64_t /*value*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::offset( u_int16_t /*channel*/,
+                                                      u_int64_t /*value*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/,
-                                              const string& /*reference*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::open( u_int16_t /*channel*/,
+                                                    const string& /*reference*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::qos( u_int16_t /*channel*/,
-                                             u_int32_t /*prefetchSize*/,
-                                             u_int16_t /*prefetchCount*/,
-                                             bool /*global*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::qos( u_int16_t /*channel*/,
+                                                   u_int32_t /*prefetchSize*/,
+                                                   u_int16_t /*prefetchCount*/,
+                                                   bool /*global*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::recover( u_int16_t /*channel*/,
-                                                 bool /*requeue*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::recover( u_int16_t /*channel*/,
+                                                       bool /*requeue*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/,
-                                                u_int16_t /*code*/,
-                                                const string& /*text*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::reject( u_int16_t /*channel*/,
+                                                      u_int16_t /*code*/,
+                                                      const string& /*text*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/,
-                                                const string& /*reference*/,
-                                                const string& /*identifier*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::resume( u_int16_t /*channel*/,
+                                                      const string& /*reference*/,
+                                                      const string& /*identifier*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
-BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
-                                                  u_int16_t /*ticket*/,
-                                                  const string& /*destination*/,
-                                                  bool /*redelivered*/,
-                                                  bool /*immediate*/,
-                                                  u_int64_t /*ttl*/,
-                                                  u_int8_t /*priority*/,
-                                                  u_int64_t /*timestamp*/,
-                                                  u_int8_t /*deliveryMode*/,
-                                                  u_int64_t /*expiration*/,
-                                                  const string& /*exchange*/,
-                                                  const string& /*routingKey*/,
-                                                  const string& /*messageId*/,
-                                                  const string& /*correlationId*/,
-                                                  const string& /*replyTo*/,
-                                                  const string& /*contentType*/,
-                                                  const string& /*contentEncoding*/,
-                                                  const string& /*userId*/,
-                                                  const string& /*appId*/,
-                                                  const string& /*transactionId*/,
-                                                  const string& /*securityToken*/,
-                                                  const qpid::framing::FieldTable& /*applicationHeaders*/,
-                                                  qpid::framing::Content /*body*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
+                                                        u_int16_t /*ticket*/,
+                                                        const string& /*destination*/,
+                                                        bool /*redelivered*/,
+                                                        bool /*immediate*/,
+                                                        u_int64_t /*ttl*/,
+                                                        u_int8_t /*priority*/,
+                                                        u_int64_t /*timestamp*/,
+                                                        u_int8_t /*deliveryMode*/,
+                                                        u_int64_t /*expiration*/,
+                                                        const string& /*exchange*/,
+                                                        const string& /*routingKey*/,
+                                                        const string& /*messageId*/,
+                                                        const string& /*correlationId*/,
+                                                        const string& /*replyTo*/,
+                                                        const string& /*contentType*/,
+                                                        const string& /*contentEncoding*/,
+                                                        const string& /*userId*/,
+                                                        const string& /*appId*/,
+                                                        const string& /*transactionId*/,
+                                                        const string& /*securityToken*/,
+                                                        const qpid::framing::FieldTable& /*applicationHeaders*/,
+                                                        qpid::framing::Content /*body*/ )
 {
     assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
+BrokerAdapter::BrokerAdapter(
+    Channel* ch, Connection& c, Broker& b
+) :
+    channel(ch),
+    connection(c),
+    broker(b),
+    serverOps(new ServerOps(*ch,c,b))
+{
+    assert(ch);
+}
+
+void BrokerAdapter::handleMethod(
+    boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+{
+    try{
+        // FIXME aconway 2007-01-17: invoke to take Channel&?
+        method->invoke(*serverOps, channel->getId());
+    }catch(ChannelException& e){
+        connection.closeChannel(channel->getId());
+        connection.client->getChannel().close(
+            channel->getId(), e.code, e.toString(),
+            method->amqpClassId(), method->amqpMethodId());
+    }catch(ConnectionException& e){
+        connection.client->getConnection().close(
+            0, e.code, e.toString(),
+            method->amqpClassId(), method->amqpMethodId());
+    }catch(std::exception& e){
+        connection.client->getConnection().close(
+            0, 541/*internal error*/, e.what(),
+            method->amqpClassId(), method->amqpMethodId());
+    }
+}
+
+void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
+    channel->handleHeader(body);
+}
+
+void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) {
+    channel->handleContent(body);
+}
+
+void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+    // TODO aconway 2007-01-17: Implement heartbeats.
+}
+
+
+
 }} // namespace qpid::broker
+

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Wed Jan 17 22:27:50 2007
@@ -20,237 +20,45 @@
  */
 
 #include "AMQP_ServerOperations.h"
+#include "BodyHandler.h"
+#include "BrokerChannel.h"
 
 namespace qpid {
 namespace broker {
 
+class AMQMethodBody;
 class Connection;
+class Broker;
+
+// FIXME aconway 2007-01-17: Rename to ChannelAdapter.
 
 /**
- * Protocol adapter class for the broker.
+ * Per-channel protocol adapter.
+ *
+ * Translates protocol bodies into calls on the core Channel,
+ * Connection and Broker objects.
+ *
+ * Owns a channel, has references to Connection and Broker.
  */
-class BrokerAdapter : public qpid::framing::AMQP_ServerOperations 
+class BrokerAdapter : public qpid::framing::BodyHandler
 {
   public:
-    BrokerAdapter(Connection& connection);
-    AccessHandler* getAccessHandler();
-    BasicHandler* getBasicHandler();
-    ChannelHandler* getChannelHandler();
-    ConnectionHandler* getConnectionHandler();
-    DtxHandler* getDtxHandler();
-    ExchangeHandler* getExchangeHandler();
-    FileHandler* getFileHandler();
-    MessageHandler* getMessageHandler();
-    QueueHandler* getQueueHandler();
-    StreamHandler* getStreamHandler();
-    TunnelHandler* getTunnelHandler();
-    TxHandler* getTxHandler();
+    // FIXME aconway 2007-01-18: takes ownership, should pass auto_ptr<Channel>
+    BrokerAdapter(Channel* ch, Connection&, Broker&);
+    Channel& getChannel() { return *channel; }
+
+    void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
+    void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+    void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+    void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
 
   private:
+    class ServerOps;
 
-    class ConnectionHandlerImpl : public ConnectionHandler{
-        Connection& connection;
-      public:
-        ConnectionHandlerImpl(Connection& c) : connection(c) {}
-
-        void startOk(u_int16_t channel,
-                     const qpid::framing::FieldTable& clientProperties,
-                     const std::string& mechanism, const std::string& response,
-                     const std::string& locale); 
-        void secureOk(u_int16_t channel, const std::string& response); 
-        void tuneOk(u_int16_t channel, u_int16_t channelMax,
-                    u_int32_t frameMax, u_int16_t heartbeat); 
-        void open(u_int16_t channel, const std::string& virtualHost,
-                  const std::string& capabilities, bool insist); 
-        void close(u_int16_t channel, u_int16_t replyCode,
-                   const std::string& replyText,
-                   u_int16_t classId, u_int16_t methodId); 
-        void closeOk(u_int16_t channel); 
-    };
-
-    class ChannelHandlerImpl : public ChannelHandler{
-        Connection& connection;
-      public:
-        ChannelHandlerImpl(Connection& c) : connection(c) {}
-        void open(u_int16_t channel, const std::string& outOfBand); 
-        void flow(u_int16_t channel, bool active); 
-        void flowOk(u_int16_t channel, bool active); 
-        void ok( u_int16_t channel );
-        void ping( u_int16_t channel );
-        void pong( u_int16_t channel );
-        void resume( u_int16_t channel, const std::string& channelId );
-        void close(u_int16_t channel, u_int16_t replyCode, const
-                   std::string& replyText, u_int16_t classId, u_int16_t methodId); 
-        void closeOk(u_int16_t channel); 
-    };
-    
-    class ExchangeHandlerImpl : public ExchangeHandler{
-        Connection& connection;
-      public:
-        ExchangeHandlerImpl(Connection& c) : connection(c) {}
-        void declare(u_int16_t channel, u_int16_t ticket,
-                     const std::string& exchange, const std::string& type, 
-                     bool passive, bool durable, bool autoDelete,
-                     bool internal, bool nowait, 
-                     const qpid::framing::FieldTable& arguments); 
-        void delete_(u_int16_t channel, u_int16_t ticket,
-                     const std::string& exchange, bool ifUnused, bool nowait); 
-        void unbind(u_int16_t channel,
-                    u_int16_t ticket, const std::string& queue,
-                    const std::string& exchange, const std::string& routingKey,
-                    const qpid::framing::FieldTable& arguments );
-    };
-
-    class QueueHandlerImpl : public QueueHandler{
-        Connection& connection;
-      public:
-        QueueHandlerImpl(Connection& c) : connection(c) {}
-        void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
-                     bool passive, bool durable, bool exclusive, 
-                     bool autoDelete, bool nowait,
-                     const qpid::framing::FieldTable& arguments); 
-        void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
-                  const std::string& exchange, const std::string& routingKey,
-                  bool nowait, const qpid::framing::FieldTable& arguments); 
-        void unbind(u_int16_t channel,
-                    u_int16_t ticket,
-                    const std::string& queue,
-                    const std::string& exchange,
-                    const std::string& routingKey,
-                    const qpid::framing::FieldTable& arguments );
-        void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
-                   bool nowait); 
-        void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue,
-                     bool ifUnused, bool ifEmpty, 
-                     bool nowait); 
-    };
-
-    class BasicHandlerImpl : public BasicHandler{
-        Connection& connection;
-      public:
-        BasicHandlerImpl(Connection& c) : connection(c) {}
-        void qos(u_int16_t channel, u_int32_t prefetchSize,
-                 u_int16_t prefetchCount, bool global); 
-        void consume(
-            u_int16_t channel, u_int16_t ticket, const std::string& queue,
-            const std::string& consumerTag, bool noLocal, bool noAck,
-            bool exclusive, bool nowait,
-            const qpid::framing::FieldTable& fields); 
-        void cancel(u_int16_t channel, const std::string& consumerTag,
-                    bool nowait); 
-        void publish(u_int16_t channel, u_int16_t ticket,
-                     const std::string& exchange, const std::string& routingKey, 
-                     bool mandatory, bool immediate); 
-        void get(u_int16_t channel, u_int16_t ticket, const std::string& queue,
-                 bool noAck); 
-        void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); 
-        void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); 
-        void recover(u_int16_t channel, bool requeue); 
-    };
-
-    class TxHandlerImpl : public TxHandler{
-        Connection& connection;
-      public:
-        TxHandlerImpl(Connection& c) : connection(c) {}
-        void select(u_int16_t channel);
-        void commit(u_int16_t channel);
-        void rollback(u_int16_t channel);
-    };
-
-    class MessageHandlerImpl : public MessageHandler {
-        Connection& connection;
-      public:
-        MessageHandlerImpl(Connection& c) : connection(c) {}
-
-        void append( u_int16_t channel,
-                     const std::string& reference,
-                     const std::string& bytes );
-
-        void cancel( u_int16_t channel,
-                     const std::string& destination );
-
-        void checkpoint( u_int16_t channel,
-                         const std::string& reference,
-                         const std::string& identifier );
-
-        void close( u_int16_t channel,
-                    const std::string& reference );
-
-        void consume( u_int16_t channel,
-                      u_int16_t ticket,
-                      const std::string& queue,
-                      const std::string& destination,
-                      bool noLocal,
-                      bool noAck,
-                      bool exclusive,
-                      const qpid::framing::FieldTable& filter );
-
-        void empty( u_int16_t channel );
-
-        void get( u_int16_t channel,
-                  u_int16_t ticket,
-                  const std::string& queue,
-                  const std::string& destination,
-                  bool noAck );
-
-        void offset( u_int16_t channel,
-                     u_int64_t value );
-
-        void ok( u_int16_t channel );
-
-        void open( u_int16_t channel,
-                   const std::string& reference );
-
-        void qos( u_int16_t channel,
-                  u_int32_t prefetchSize,
-                  u_int16_t prefetchCount,
-                  bool global );
-
-        void recover( u_int16_t channel,
-                      bool requeue );
-
-        void reject( u_int16_t channel,
-                     u_int16_t code,
-                     const std::string& text );
-
-        void resume( u_int16_t channel,
-                     const std::string& reference,
-                     const std::string& identifier );
-
-        void transfer( u_int16_t channel,
-                       u_int16_t ticket,
-                       const std::string& destination,
-                       bool redelivered,
-                       bool immediate,
-                       u_int64_t ttl,
-                       u_int8_t priority,
-                       u_int64_t timestamp,
-                       u_int8_t deliveryMode,
-                       u_int64_t expiration,
-                       const std::string& exchange,
-                       const std::string& routingKey,
-                       const std::string& messageId,
-                       const std::string& correlationId,
-                       const std::string& replyTo,
-                       const std::string& contentType,
-                       const std::string& contentEncoding,
-                       const std::string& userId,
-                       const std::string& appId,
-                       const std::string& transactionId,
-                       const std::string& securityToken,
-                       const qpid::framing::FieldTable& applicationHeaders,
-                       qpid::framing::Content body );
-    };
-
+    std::auto_ptr<Channel> channel;
     Connection& connection;
-
-    BasicHandlerImpl basicHandler;
-    ChannelHandlerImpl channelHandler;
-    ConnectionHandlerImpl connectionHandler;
-    ExchangeHandlerImpl exchangeHandler;
-    MessageHandlerImpl messageHandler;
-    QueueHandlerImpl queueHandler;
-    TxHandlerImpl txHandler;
+    Broker& broker;
+    boost::shared_ptr<ServerOps> serverOps;
 };
   
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Wed Jan 17 22:27:50 2007
@@ -18,12 +18,13 @@
  * under the License.
  *
  */
-#include <BrokerChannel.h>
-#include <QpidError.h>
 #include <iostream>
 #include <sstream>
 #include <assert.h>
 
+#include <BrokerChannel.h>
+#include <QpidError.h>
+
 using std::mem_fun_ref;
 using std::bind2nd;
 using namespace qpid::broker;
@@ -31,9 +32,13 @@
 using namespace qpid::sys;
 
 
-Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
-    id(_id), 
-    out(_out), 
+Channel::Channel(
+    const ProtocolVersion& _version, OutputHandler* _out, int _id,
+    u_int32_t _framesize, MessageStore* const _store,
+    u_int64_t _stagingThreshold
+) :
+    id(_id),
+    out(*_out),
     currentDeliveryTag(1),
     transactional(false),
     prefetchSize(0),
@@ -43,9 +48,8 @@
     store(_store),
     messageBuilder(this, _store, _stagingThreshold),
     version(_version),
-    isClosed(false)
+    opened(false)
 {
-
     outstanding.reset();
 }
 
@@ -57,7 +61,7 @@
     return consumers.find(consumerTag) != consumers.end();
 }
 
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) {
 	if(tag.empty()) tag = tagGenerator.generate();
     ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
     try{
@@ -86,8 +90,8 @@
 }
 
 void Channel::close(){
-    if (!isClosed) {
-        isClosed = true;
+    if (isOpen()) {
+        opened = false;
         while (!consumers.empty()) 
             cancel(consumers.begin());
         //requeue:
@@ -123,7 +127,7 @@
         outstanding.count++;
     }
     //send deliver method, header and content(s)
-    msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
+    msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
 }
 
 bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -180,6 +184,10 @@
     messageBuilder.addContent(content);
 }
 
+void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+    // TODO aconway 2007-01-17: Implement heartbeating.
+}
+
 void Channel::complete(Message::shared_ptr& msg){
     if(exchange){
         if(transactional){
@@ -247,7 +255,7 @@
     if(msg){
         Mutex::ScopedLock locker(deliveryLock);
         u_int64_t myDeliveryTag = currentDeliveryTag++;
-        msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
+        msg->sendGetOk(&out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }
@@ -258,5 +266,6 @@
 }
 
 void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
-    msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
+    msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
 }
+

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Wed Jan 17 22:27:50 2007
@@ -48,83 +48,94 @@
 #include <BasicPublishBody.h>
 
 namespace qpid {
-    namespace broker {
-        using qpid::framing::string;
+namespace broker {
 
-        /**
-         * Maintains state for an AMQP channel. Handles incoming and
-         * outgoing messages for that channel.
-         */
-        class Channel : private MessageBuilder::CompletionHandler{
-            class ConsumerImpl : public virtual Consumer{
-                Channel* parent;
-                const string tag;
-                Queue::shared_ptr queue;
-                ConnectionToken* const connection;
-                const bool ackExpected;
-                bool blocked;
-            public:
-                ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
-                virtual bool deliver(Message::shared_ptr& msg);            
-                void cancel();
-                void requestDispatch();
-            };
-
-            typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; 
-            const int id;
-            qpid::framing::OutputHandler* out;
-            u_int64_t currentDeliveryTag;
-            Queue::shared_ptr defaultQueue;
-            bool transactional;
-            std::map<string, ConsumerImpl*> consumers;
-            u_int32_t prefetchSize;    
-            u_int16_t prefetchCount;    
-            Prefetch outstanding;
-            u_int32_t framesize;
-            NameGenerator tagGenerator;
-            std::list<DeliveryRecord> unacked;
-            qpid::sys::Mutex deliveryLock;
-            TxBuffer txBuffer;
-            AccumulatedAck accumulatedAck;
-            MessageStore* const store;
-            MessageBuilder messageBuilder;//builder for in-progress message
-            Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
-	    qpid::framing::ProtocolVersion version; // version used for this channel
-            bool isClosed;
-
-            virtual void complete(Message::shared_ptr& msg);
-            void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);            
-            void cancel(consumer_iterator consumer);
-            bool checkPrefetch(Message::shared_ptr& msg);
+using qpid::framing::string;
+
+/**
+ * Maintains state for an AMQP channel. Handles incoming and
+ * outgoing messages for that channel.
+ */
+class Channel : private MessageBuilder::CompletionHandler
+{
+    class ConsumerImpl : public virtual Consumer
+    {
+        Channel* parent;
+        const string tag;
+        Queue::shared_ptr queue;
+        ConnectionToken* const connection;
+        const bool ackExpected;
+        bool blocked;
+      public:
+        ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
+        virtual bool deliver(Message::shared_ptr& msg);            
+        void cancel();
+        void requestDispatch();
+    };
+
+    typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+    u_int16_t id;
+    qpid::framing::OutputHandler& out;
+    u_int64_t currentDeliveryTag;
+    Queue::shared_ptr defaultQueue;
+    bool transactional;
+    std::map<string, ConsumerImpl*> consumers;
+    u_int32_t prefetchSize;    
+    u_int16_t prefetchCount;    
+    Prefetch outstanding;
+    u_int32_t framesize;
+    NameGenerator tagGenerator;
+    std::list<DeliveryRecord> unacked;
+    qpid::sys::Mutex deliveryLock;
+    TxBuffer txBuffer;
+    AccumulatedAck accumulatedAck;
+    MessageStore* const store;
+    MessageBuilder messageBuilder;//builder for in-progress message
+    Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
+    qpid::framing::ProtocolVersion version; // version used for this channel
+    bool opened;
+
+    virtual void complete(Message::shared_ptr& msg);
+    void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);            
+    void cancel(consumer_iterator consumer);
+    bool checkPrefetch(Message::shared_ptr& msg);
         
-        public:
-            Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize, 
-                    MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
-            ~Channel();
-            inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
-            inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
-            inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
-            inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; }
-            bool exists(const string& consumerTag);
-            void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive,
-                         ConnectionToken* const connection = 0, const qpid::framing::FieldTable* = 0);
-            void cancel(const string& tag);
-            bool get(Queue::shared_ptr queue, bool ackExpected);
-            void begin();
-            void close();
-            void commit();
-            void rollback();
-            void ack(u_int64_t deliveryTag, bool multiple);
-            void recover(bool requeue);
-            void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);            
-            void handlePublish(Message* msg, Exchange::shared_ptr exchange);
-            void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
-            void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
-        };
-
-        struct InvalidAckException{};
-    }
-}
+  public:
+    Channel(
+        const qpid::framing::ProtocolVersion& _version,
+        qpid::framing::OutputHandler* out, int id, u_int32_t framesize, 
+        MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
+    ~Channel();
+    bool isOpen() const { return opened; }
+    void open() { opened = true; }
+    u_int16_t getId() const { return id; }
+    void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+    Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
+    u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
+    u_int16_t setPrefetchCount(u_int16_t n){ return prefetchCount = n; }
+
+    bool exists(const string& consumerTag);
+    void consume(string& tag, Queue::shared_ptr queue, bool acks,
+                 bool exclusive, ConnectionToken* const connection = 0,
+                 const qpid::framing::FieldTable* = 0);
+    void cancel(const string& tag);
+    bool get(Queue::shared_ptr queue, bool ackExpected);
+    void begin();
+    void close();
+    void commit();
+    void rollback();
+    void ack(u_int64_t deliveryTag, bool multiple);
+    void recover(bool requeue);
+    void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);            
+    void handlePublish(Message* msg, Exchange::shared_ptr exchange);
+    void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr);
+    void handleContent(qpid::framing::AMQContentBody::shared_ptr);
+    void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr);
+};
+
+struct InvalidAckException{};
+
+}} // namespace qpid::broker
 
 
 #endif  /*!_broker_BrokerChannel_h*/

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Wed Jan 17 22:27:50 2007
@@ -23,10 +23,6 @@
 
 #include "Connection.h"
 
-// TODO aconway 2007-01-16: move to channel.
-#include "Requester.h"
-#include "Responder.h"
-
 using namespace boost;
 using namespace qpid::sys;
 using namespace qpid::framing;
@@ -36,9 +32,6 @@
 namespace broker {
 
 Connection::Connection(SessionContext* context_, Broker& broker_) :
-    adapter(*this),
-    requester(broker.getRequester()),
-    responder(broker.getResponder()),
     context(context_), 
     framemax(65536), 
     heartbeat(0),
@@ -65,89 +58,15 @@
     return broker.getExchanges().get(name);
 }
 
-void Connection::handleMethod(
-    u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
-{
-    AMQMethodBody::shared_ptr method =
-        shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
-    try{
-        method->invoke(adapter, channel);
-    }catch(ChannelException& e){
-        closeChannel(channel);
-        client->getChannel().close(
-            channel, e.code, e.toString(),
-            method->amqpClassId(), method->amqpMethodId());
-    }catch(ConnectionException& e){
-        client->getConnection().close(
-            0, e.code, e.toString(),
-            method->amqpClassId(), method->amqpMethodId());
-    }catch(std::exception& e){
-        client->getConnection().close(
-            0, 541/*internal error*/, e.what(),
-            method->amqpClassId(), method->amqpMethodId());
-    }
-}
 
 void Connection::received(qpid::framing::AMQFrame* frame){
-    u_int16_t channel = frame->getChannel();
-    AMQBody::shared_ptr body = frame->getBody();
-    switch(body->type())
-    {
-      case REQUEST_BODY:
-        responder.received(AMQRequestBody::getData(body));
-        handleMethod(channel, body);
-        break;
-      case RESPONSE_BODY:
-        // Must process responses before marking them received.
-        handleMethod(channel, body);     
-        requester.processed(AMQResponseBody::getData(body));
-        break;
-        // TODO aconway 2007-01-15: Leftover from 0-8 support, remove.
-      case METHOD_BODY:    
-        handleMethod(channel, body);
-        break;
-      case HEADER_BODY:
-	handleHeader(
-            channel, shared_polymorphic_cast<AMQHeaderBody>(body));
-	break;
-
-      case CONTENT_BODY:
-	handleContent(
-            channel, shared_polymorphic_cast<AMQContentBody>(body));
-	break;
-
-      case HEARTBEAT_BODY:
-        assert(channel == 0);
-	handleHeartbeat(
-            shared_polymorphic_cast<AMQHeartbeatBody>(body));
-	break;
-    }
-}
-
-/**
- * An OutputHandler that does request/response procssing before
- * delgating to another OutputHandler.
- */
-Connection::Sender::Sender(
-    OutputHandler& oh, Requester& req, Responder& resp)
-    : out(oh), requester(req), responder(resp)
-{}
-
-void Connection::Sender::send(AMQFrame* frame) {
-    AMQBody::shared_ptr body =  frame->getBody();
-    u_int16_t type = body->type();
-    if (type == REQUEST_BODY)
-        requester.sending(AMQRequestBody::getData(body));
-    else if (type == RESPONSE_BODY)
-        responder.sending(AMQResponseBody::getData(body));
-    out.send(frame);
+    getAdapter(frame->getChannel()).handleBody(frame->getBody());
 }
 
 void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
     if (client.get())
-        // TODO aconway 2007-01-16: correct code.
+        // TODO aconway 2007-01-16: correct error code.
         throw ConnectionException(0, "Connection initiated twice");
-
     client.reset(new qpid::framing::AMQP_ClientProxy(
                      context, header->getMajor(), header->getMinor()));
     FieldTable properties;
@@ -159,7 +78,6 @@
         mechanisms, locales);
 }
 
-
 void Connection::idleOut(){}
 
 void Connection::idleIn(){}
@@ -177,42 +95,29 @@
     }
 }
 
-// TODO aconway 2007-01-16: colapse these. 
-void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
-    getChannel(channel).handleHeader(body);
+void Connection::closeChannel(u_int16_t channel) {
+    getChannel(channel).close(); 
+    adapters.erase(adapters.find(channel));
 }
 
-void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
-    getChannel(channel).handleContent(body);
-}
 
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-    std::cout << "Connection::handleHeartbeat()" << std::endl;
-}
-
-void Connection::openChannel(u_int16_t channel) {
-    if (channel == 0)
-        throw ConnectionException(504, "Illegal channel 0");
-    if (channels.find(channel) != channels.end())
-        throw ConnectionException(504, "Channel already open: " + channel);
-    channels.insert(
-        channel,
-        new Channel(
-            client->getProtocolVersion(), context, channel, framemax,
-            broker.getQueues().getStore(), settings.stagingThreshold));
-}
-
-void Connection::closeChannel(u_int16_t channel) {
-    getChannel(channel).close(); // throws if channel does not exist.
-    channels.erase(channels.find(channel));
+BrokerAdapter& Connection::getAdapter(u_int16_t id) { 
+    AdapterMap::iterator i = adapters.find(id);
+    if (i == adapters.end()) {
+        Channel* ch=new Channel(
+            client->getProtocolVersion(), context, id,
+            framemax, broker.getQueues().getStore(),
+            settings.stagingThreshold);
+        BrokerAdapter* adapter =  new BrokerAdapter(ch, *this, broker);
+        adapters.insert(id, adapter);
+        return *adapter;
+    }
+    else
+        return *i;
 }
 
-
-Channel& Connection::getChannel(u_int16_t channel){
-    ChannelMap::iterator i = channels.find(channel);
-    if(i == channels.end())
-        throw ConnectionException(504, "Unknown channel: " + channel);
-    return *i;
+Channel& Connection::getChannel(u_int16_t id) {
+    return getAdapter(id).getChannel();
 }
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h Wed Jan 17 22:27:50 2007
@@ -33,8 +33,8 @@
 #include <sys/ConnectionInputHandler.h>
 #include <sys/TimeoutHandler.h>
 #include "Broker.h"
-#include "BrokerAdapter.h"
 #include "Exception.h"
+#include "BrokerAdapter.h"
 
 namespace qpid {
 namespace broker {
@@ -50,38 +50,15 @@
 class Connection : public qpid::sys::ConnectionInputHandler, 
                    public ConnectionToken
 {
-    typedef boost::ptr_map<u_int16_t, Channel> ChannelMap;
-
-    // TODO aconway 2007-01-16: belongs on broker.
-    typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
-    class Sender : public qpid::framing::OutputHandler {
-      public:
-        Sender(qpid::framing::OutputHandler&,
-               qpid::framing::Requester&, qpid::framing::Responder&);
-        void send(qpid::framing::AMQFrame* frame);
-      private:
-        OutputHandler& out;
-        qpid::framing::Requester& requester;
-        qpid::framing::Responder& responder;
-    };
-
-    BrokerAdapter adapter;
-    // FIXME aconway 2007-01-16: On Channel
-    qpid::framing::Requester& requester;
-    qpid::framing::Responder& responder;
-    ChannelMap channels;
-
-    void handleHeader(u_int16_t channel,
-                      qpid::framing::AMQHeaderBody::shared_ptr body);
-    void handleContent(u_int16_t channel,
-                       qpid::framing::AMQContentBody::shared_ptr body);
-    void handleMethod(u_int16_t channel,
-                      qpid::framing::AMQBody::shared_ptr body);
-    void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+    typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
 
     // FIXME aconway 2007-01-16: on broker.
+    typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
     Exchange::shared_ptr findExchange(const string& name);
+
+    BrokerAdapter& getAdapter(u_int16_t id);
+    
+    AdapterMap adapters;
     
   public:
     Connection(qpid::sys::SessionContext* context, Broker& broker);
@@ -111,10 +88,9 @@
      */
     Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
 
-
-    void openChannel(u_int16_t channel);
-    void closeChannel(u_int16_t channel);
+    Channel& newChannel(u_int16_t channel);
     Channel& getChannel(u_int16_t channel);
+    void closeChannel(u_int16_t channel);
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Wed Jan 17 22:27:50 2007
@@ -253,4 +253,5 @@
     for(iterator i = channels.begin(); i != channels.end(); i++){
         i->second->stop();
     }
+    responses.signalResponse(AMQMethodBody::shared_ptr());
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp Wed Jan 17 22:27:50 2007
@@ -29,28 +29,28 @@
 qpid::client::ResponseHandler::~ResponseHandler(){}
 
 bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
-    return expected.match(response.get());
+    return response != 0 && expected.match(response.get());
 }
 
 void qpid::client::ResponseHandler::waitForResponse(){
     Monitor::ScopedLock l(monitor);
-    if(waiting){
+    while (waiting)
 	monitor.wait();
-    }
 }
 
-void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
-    response = _response;
+void qpid::client::ResponseHandler::signalResponse(
+    qpid::framing::AMQMethodBody::shared_ptr _response)
+{
     Monitor::ScopedLock l(monitor);
+    response = _response;
     waiting = false;
     monitor.notify();
 }
 
 void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
     Monitor::ScopedLock l(monitor);
-    if(waiting){
+    while (waiting)
 	monitor.wait();
-    }
     if(!validate(expected)){
 	THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
     }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp Wed Jan 17 22:27:50 2007
@@ -18,38 +18,62 @@
  * under the License.
  *
  */
-#include <boost/shared_ptr.hpp>
-#include <BodyHandler.h>
+#include "QpidError.h"
+#include "BodyHandler.h"
+#include <AMQRequestBody.h>
+#include <AMQResponseBody.h>
+#include <AMQMethodBody.h>
+#include <AMQHeaderBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeartbeatBody.h>
 
 using namespace qpid::framing;
 using namespace boost;
 
 BodyHandler::~BodyHandler() {}
 
-void BodyHandler::handleBody(const AMQBody::shared_ptr& body){
-
+void BodyHandler::handleBody(shared_ptr<AMQBody> body) {
     switch(body->type())
     {
-      case METHOD_BODY:
       case REQUEST_BODY:
+        handleRequest(shared_polymorphic_cast<AMQRequestBody>(body));
+        break; 
       case RESPONSE_BODY:
-	handleMethod(dynamic_pointer_cast<AMQMethodBody, AMQBody>(body));
+        handleResponse(shared_polymorphic_cast<AMQResponseBody>(body));
+        break;
+      case METHOD_BODY:
+	handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
 	break;
- 
       case HEADER_BODY:
-	handleHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+	handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body));
 	break;
-
       case CONTENT_BODY:
-	handleContent(dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+	handleContent(shared_polymorphic_cast<AMQContentBody>(body));
 	break;
-
       case HEARTBEAT_BODY:
-	handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+	handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body));
 	break;
-
       default:
-	throw UnknownBodyType(body->type());
+        QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type());
     }
+}
 
+void BodyHandler::handleRequest(AMQRequestBody::shared_ptr request) {
+    responder.received(request->getData());
+    handleMethod(request);
+}
+
+void BodyHandler::handleResponse(AMQResponseBody::shared_ptr response) {
+    handleMethod(response);
+    requester.processed(response->getData());
+}
+
+void BodyHandler::assertChannelZero(u_int16_t id) {
+    if (id != 0)
+        throw ConnectionException(504, "Invalid channel id, not 0");
+}
+
+void BodyHandler::assertChannelNonZero(u_int16_t id) {
+    if (id == 0)
+        throw ConnectionException(504, "Invalid channel id 0");
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h Wed Jan 17 22:27:50 2007
@@ -1,3 +1,6 @@
+#ifndef _BodyHandler_
+#define _BodyHandler_
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,37 +21,55 @@
  * under the License.
  *
  */
-#include <string>
 
-#ifndef _BodyHandler_
-#define _BodyHandler_
+#include <boost/shared_ptr.hpp>
 
-#include <AMQMethodBody.h>
-#include <AMQHeaderBody.h>
-#include <AMQContentBody.h>
-#include <AMQHeartbeatBody.h>
+#include "Requester.h"
+#include "Responder.h"
 
 namespace qpid {
 namespace framing {
 
-    class BodyHandler{
-    public:
-        virtual ~BodyHandler();
-	virtual void handleMethod(AMQMethodBody::shared_ptr body) = 0;
-	virtual void handleHeader(AMQHeaderBody::shared_ptr body) = 0;
-	virtual void handleContent(AMQContentBody::shared_ptr body) = 0;
-	virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body) = 0;
-
-        void handleBody(const AMQBody::shared_ptr& body);
-    };
-
-    class UnknownBodyType{
-    public:
-	const u_int16_t type;
-	inline UnknownBodyType(u_int16_t _type) : type(_type){}
-    };
-}
-}
+class AMQRequestBody;
+class AMQResponseBody;
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class AMQHeartbeatBody;
+
+/**
+ * Base class for client and broker channel handlers.
+ * 
+ * Handles request/response id management common to client and broker.
+ * Derived classes provide remaining client/broker specific handling.
+ */
+class BodyHandler {
+  public:
+    virtual ~BodyHandler();
+    
+    void handleBody(boost::shared_ptr<AMQBody> body);
+
+  protected:
+    virtual void handleRequest(boost::shared_ptr<AMQRequestBody>);
+    virtual void handleResponse(boost::shared_ptr<AMQResponseBody>);
+
+    virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0;
+    virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0;
+    virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0;
+    virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0;
+
+  protected:
+    /** Throw protocol exception if this is not channel 0. */
+    static void assertChannelZero(u_int16_t id);
+    /** Throw protocol exception if this is channel 0. */
+    static void assertChannelNonZero(u_int16_t id);
+
+  private:
+    Requester requester;
+    Responder responder;
+};
+
+}}
 
 
 #endif

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp Wed Jan 17 22:27:50 2007
@@ -135,6 +135,7 @@
     void testConsumerMgmt(){
         Queue::shared_ptr queue(new Queue("my_queue"));
         Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0);
+        channel.open();
         CPPUNIT_ASSERT(!channel.exists("my_consumer"));
 
         ConnectionToken* owner = 0;

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Wed Jan 17 22:27:50 2007
@@ -41,7 +41,6 @@
   ValueTest
 
 framing_tests =		\
-  BodyHandlerTest	\
   FieldTableTest	\
   FramingTest		\
   HeaderTest

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker Wed Jan 17 22:27:50 2007
@@ -8,3 +8,7 @@
 
 # Start the daemon, recording its PID.
 ../src/qpidd > $LOG 2>&1 & echo $! > $PID
+
+# FIXME aconway 2007-01-18: qpidd should not return till it is accepting
+# connections, remove arbitrary sleep.
+sleep 1