You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/17 04:49:36 UTC

svn commit: r496926 - in /incubator/qpid/branches/qpid.0-9/cpp/lib: broker/BrokerAdapter.cpp broker/BrokerAdapter.h broker/BrokerChannel.cpp broker/BrokerChannel.h broker/Connection.cpp broker/Connection.h broker/Makefile.am common/framing/amqp_types.h

Author: aconway
Date: Tue Jan 16 19:49:35 2007
New Revision: 496926

URL: http://svn.apache.org/viewvc?view=rev&rev=496926
Log:

Separated adapter code from Connection class: Extracted all
HandlerImpl classes to BrokerAdapter. The Connection is now part of the
version-invariant core, all version-dependent code is in BrokerAdapter.

The extraction exposes some ugly dependencies between adapter, Connection
and parts of the Broker. More refactoring to follow to improve encapsulation.

Added:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=auto&rev=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Jan 16 19:49:35 2007
@@ -0,0 +1,537 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "BrokerAdapter.h"
+#include "Connection.h"
+#include "Exception.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+BrokerAdapter::BrokerAdapter(Connection& c) :
+    connection(c),
+    basicHandler(c),
+    channelHandler(c),
+    connectionHandler(c),
+    exchangeHandler(c),
+    messageHandler(c),
+    queueHandler(c),
+    txHandler(c)    
+{}
+
+typedef qpid::framing::AMQP_ServerOperations Ops;
+
+Ops::ChannelHandler* BrokerAdapter::getChannelHandler() {
+    return &channelHandler;
+}
+Ops::ConnectionHandler* BrokerAdapter::getConnectionHandler() {
+    return &connectionHandler;
+}
+Ops::BasicHandler* BrokerAdapter::getBasicHandler() {
+    return &basicHandler;
+}
+Ops::ExchangeHandler* BrokerAdapter::getExchangeHandler() {
+    return &exchangeHandler;
+}
+Ops::QueueHandler* BrokerAdapter::getQueueHandler() {
+    return &queueHandler;
+}
+Ops::TxHandler* BrokerAdapter::getTxHandler() {
+    return &txHandler; 
+}
+Ops::MessageHandler* BrokerAdapter::getMessageHandler() {
+    return &messageHandler; 
+}
+Ops::AccessHandler* BrokerAdapter::getAccessHandler() {
+    throw ConnectionException(540, "Access class not implemented"); 
+}
+Ops::FileHandler* BrokerAdapter::getFileHandler() {
+    throw ConnectionException(540, "File class not implemented"); 
+}
+Ops::StreamHandler* BrokerAdapter::getStreamHandler() {
+    throw ConnectionException(540, "Stream class not implemented"); 
+}
+Ops::DtxHandler* BrokerAdapter::getDtxHandler() {
+    throw ConnectionException(540, "Dtx class not implemented"); 
+}
+Ops::TunnelHandler* BrokerAdapter::getTunnelHandler() {
+    throw ConnectionException(540, "Tunnel class not implemented");
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::startOk(
+    u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, 
+    const string& /*response*/, const string& /*locale*/){
+    connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat);
+}
+        
+void BrokerAdapter::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+        
+void BrokerAdapter::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+    connection.framemax = framemax;
+    connection.heartbeat = heartbeat;
+}
+        
+void BrokerAdapter::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+    string knownhosts;
+    connection.client->getConnection().openOk(0, knownhosts);
+}
+        
+void BrokerAdapter::ConnectionHandlerImpl::close(
+    u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, 
+    u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
+    connection.client->getConnection().closeOk(0);
+    connection.context->close();
+} 
+        
+void BrokerAdapter::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+    connection.context->close();
+} 
+              
+void BrokerAdapter::ChannelHandlerImpl::open(
+    u_int16_t channel, const string& /*outOfBand*/){
+    connection.openChannel(channel);
+    // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
+    connection.client->getChannel().openOk(channel, std::string()/* ID */);
+} 
+        
+void BrokerAdapter::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}         
+void BrokerAdapter::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} 
+        
+void BrokerAdapter::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, 
+                                                   u_int16_t /*classId*/, u_int16_t /*methodId*/){
+    connection.closeChannel(channel);
+    connection.client->getChannel().closeOk(channel);
+} 
+        
+void BrokerAdapter::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} 
+              
+
+
+void BrokerAdapter::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, 
+                                                      bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
+                                                      const FieldTable& /*arguments*/){
+
+    if(passive){
+        if(!connection.broker.getExchanges().get(exchange)){
+            throw ChannelException(404, "Exchange not found: " + exchange);            
+        }
+    }else{        
+        try{
+            std::pair<Exchange::shared_ptr, bool> response = connection.broker.getExchanges().declare(exchange, type);
+            if(!response.second && response.first->getType() != type){
+                throw ConnectionException(507, "Exchange already declared to be of type " 
+                                          + response.first->getType() + ", requested " + type);
+            }
+        }catch(UnknownExchangeTypeException& e){
+            throw ConnectionException(503, "Exchange type not implemented: " + type);
+        }
+    }
+    if(!nowait){
+        connection.client->getExchange().declareOk(channel);
+    }
+}
+
+                
+void BrokerAdapter::ExchangeHandlerImpl::unbind(
+    u_int16_t /*channel*/,
+    u_int16_t /*ticket*/,
+    const string& /*queue*/,
+    const string& /*exchange*/,
+    const string& /*routingKey*/,
+    const qpid::framing::FieldTable& /*arguments*/ )
+{
+    assert(0);            // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+
+                
+void BrokerAdapter::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, 
+                                                      const string& exchange, bool /*ifUnused*/, bool nowait){
+
+    //TODO: implement unused
+    connection.broker.getExchanges().destroy(exchange);
+    if(!nowait) connection.client->getExchange().deleteOk(channel);
+} 
+
+void BrokerAdapter::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, 
+                                                   bool passive, bool durable, bool exclusive, 
+                                                   bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+    Queue::shared_ptr queue;
+    if (passive && !name.empty()) {
+	queue = connection.getQueue(name, channel);
+    } else {
+	std::pair<Queue::shared_ptr, bool> queue_created =  
+            connection.broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
+	queue = queue_created.first;
+	assert(queue);
+	if (queue_created.second) { // This is a new queue
+	    connection.getChannel(channel).setDefaultQueue(queue);
+
+            //apply settings & create persistent record if required
+            queue_created.first->create(arguments);
+
+	    //add default binding:
+	    connection.broker.getExchanges().getDefault()->bind(queue, name, 0);
+	    if (exclusive) {
+		connection.exclusiveQueues.push_back(queue);
+	    } else if(autoDelete){
+		connection.broker.getCleaner().add(queue);
+	    }
+	}
+    }
+    if (exclusive && !queue->isExclusiveOwner(&connection)) {
+	throw ChannelException(405, "Cannot grant exclusive access to queue");
+    }
+    if (!nowait) {
+        string queueName = queue->getName();
+        connection.client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+    }
+} 
+        
+void BrokerAdapter::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, 
+                                                const string& exchangeName, const string& routingKey, bool nowait, 
+                                                const FieldTable& arguments){
+
+    Queue::shared_ptr queue = connection.getQueue(queueName, channel);
+    Exchange::shared_ptr exchange = connection.broker.getExchanges().get(exchangeName);
+    if(exchange){
+        // kpvdr - cannot use this any longer as routingKey is now const
+        //        if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+        //        exchange->bind(queue, routingKey, &arguments);
+        string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+        exchange->bind(queue, exchangeRoutingKey, &arguments);
+        if(!nowait) connection.client->getQueue().bindOk(channel);    
+    }else{
+        throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+    }
+} 
+        
+void BrokerAdapter::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+
+    Queue::shared_ptr queue = connection.getQueue(queueName, channel);
+    int count = queue->purge();
+    if(!nowait) connection.client->getQueue().purgeOk(channel, count);
+} 
+        
+void BrokerAdapter::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, 
+                                                   bool ifUnused, bool ifEmpty, bool nowait){
+    ChannelException error(0, "");
+    int count(0);
+    Queue::shared_ptr q = connection.getQueue(queue, channel);
+    if(ifEmpty && q->getMessageCount() > 0){
+        throw ChannelException(406, "Queue not empty.");
+    }else if(ifUnused && q->getConsumerCount() > 0){
+        throw ChannelException(406, "Queue in use.");
+    }else{
+        //remove the queue from the list of exclusive queues if necessary
+        if(q->isExclusiveOwner(&connection)){
+            queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
+            if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
+        }
+        count = q->getMessageCount();
+        q->destroy();
+        connection.broker.getQueues().destroy(queue);
+    }
+
+    if(!nowait) connection.client->getQueue().deleteOk(channel, count);
+} 
+              
+        
+
+
+void BrokerAdapter::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+    //TODO: handle global
+    connection.getChannel(channel).setPrefetchSize(prefetchSize);
+    connection.getChannel(channel).setPrefetchCount(prefetchCount);
+    connection.client->getBasic().qosOk(channel);
+} 
+        
+void BrokerAdapter::BasicHandlerImpl::consume(
+    u_int16_t channelId, u_int16_t /*ticket*/, 
+    const string& queueName, const string& consumerTag, 
+    bool noLocal, bool noAck, bool exclusive, 
+    bool nowait, const FieldTable& fields)
+{
+    
+    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);    
+    Channel& channel = connection.getChannel(channelId);
+    if(!consumerTag.empty() && channel.exists(consumerTag)){
+        throw ConnectionException(530, "Consumer tags must be unique");
+    }
+
+    try{
+        string newTag = consumerTag;
+        channel.consume(
+            newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+
+        if(!nowait) connection.client->getBasic().consumeOk(channelId, newTag);
+
+        //allow messages to be dispatched if required as there is now a consumer:
+        queue->dispatch();
+    }catch(ExclusiveAccessException& e){
+        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+        else throw ChannelException(403, "Access would violate previously granted exclusivity");
+    }
+
+} 
+        
+void BrokerAdapter::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+    connection.getChannel(channel).cancel(consumerTag);
+
+    if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag);
+} 
+        
+void BrokerAdapter::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, 
+                                                   const string& exchangeName, const string& routingKey, 
+                                                   bool mandatory, bool immediate){
+
+    Exchange::shared_ptr exchange = exchangeName.empty() ? connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName);
+    if(exchange){
+        Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate);
+        connection.getChannel(channel).handlePublish(msg, exchange);
+    }else{
+        throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+    }
+} 
+        
+void BrokerAdapter::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);    
+    if(!connection.getChannel(channelId).get(queue, !noAck)){
+        string clusterId;//not used, part of an imatix hack
+
+        connection.client->getBasic().getEmpty(channelId, clusterId);
+    }
+} 
+        
+void BrokerAdapter::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+    try{
+        connection.getChannel(channel).ack(deliveryTag, multiple);
+    }catch(InvalidAckException& e){
+        throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+    }
+} 
+        
+void BrokerAdapter::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} 
+        
+void BrokerAdapter::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+    connection.getChannel(channel).recover(requeue);
+} 
+
+void BrokerAdapter::TxHandlerImpl::select(u_int16_t channel){
+    connection.getChannel(channel).begin();
+    connection.client->getTx().selectOk(channel);
+}
+
+void BrokerAdapter::TxHandlerImpl::commit(u_int16_t channel){
+    connection.getChannel(channel).commit();
+    connection.client->getTx().commitOk(channel);
+}
+
+void BrokerAdapter::TxHandlerImpl::rollback(u_int16_t channel){
+    
+    connection.getChannel(channel).rollback();
+    connection.client->getTx().rollbackOk(channel);
+    connection.getChannel(channel).recover(false);    
+}
+              
+void
+BrokerAdapter::QueueHandlerImpl::unbind(
+    u_int16_t /*channel*/,
+    u_int16_t /*ticket*/,
+    const string& /*queue*/,
+    const string& /*exchange*/,
+    const string& /*routingKey*/,
+    const qpid::framing::FieldTable& /*arguments*/ )
+{
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
+{
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
+{
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
+{
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::resume(
+    u_int16_t /*channel*/,
+    const string& /*channelId*/ )
+{
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+// Message class method handlers
+void
+BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/,
+                                                const string& /*reference*/,
+                                                const string& /*bytes*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+
+void
+BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
+                                                const string& /*destination*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
+                                                    const string& /*reference*/,
+                                                    const string& /*identifier*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/,
+                                               const string& /*reference*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::consume( u_int16_t /*channel*/,
+                                                 u_int16_t /*ticket*/,
+                                                 const string& /*queue*/,
+                                                 const string& /*destination*/,
+                                                 bool /*noLocal*/,
+                                                 bool /*noAck*/,
+                                                 bool /*exclusive*/,
+                                                 const qpid::framing::FieldTable& /*filter*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::get( u_int16_t /*channel*/,
+                                             u_int16_t /*ticket*/,
+                                             const string& /*queue*/,
+                                             const string& /*destination*/,
+                                             bool /*noAck*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
+                                                u_int64_t /*value*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/,
+                                              const string& /*reference*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::qos( u_int16_t /*channel*/,
+                                             u_int32_t /*prefetchSize*/,
+                                             u_int16_t /*prefetchCount*/,
+                                             bool /*global*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::recover( u_int16_t /*channel*/,
+                                                 bool /*requeue*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/,
+                                                u_int16_t /*code*/,
+                                                const string& /*text*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/,
+                                                const string& /*reference*/,
+                                                const string& /*identifier*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
+                                                  u_int16_t /*ticket*/,
+                                                  const string& /*destination*/,
+                                                  bool /*redelivered*/,
+                                                  bool /*immediate*/,
+                                                  u_int64_t /*ttl*/,
+                                                  u_int8_t /*priority*/,
+                                                  u_int64_t /*timestamp*/,
+                                                  u_int8_t /*deliveryMode*/,
+                                                  u_int64_t /*expiration*/,
+                                                  const string& /*exchange*/,
+                                                  const string& /*routingKey*/,
+                                                  const string& /*messageId*/,
+                                                  const string& /*correlationId*/,
+                                                  const string& /*replyTo*/,
+                                                  const string& /*contentType*/,
+                                                  const string& /*contentEncoding*/,
+                                                  const string& /*userId*/,
+                                                  const string& /*appId*/,
+                                                  const string& /*transactionId*/,
+                                                  const string& /*securityToken*/,
+                                                  const qpid::framing::FieldTable& /*applicationHeaders*/,
+                                                  qpid::framing::Content /*body*/ )
+{
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=auto&rev=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Tue Jan 16 19:49:35 2007
@@ -0,0 +1,261 @@
+#ifndef _broker_BrokerAdapter_h
+#define _broker_BrokerAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQP_ServerOperations.h"
+
+namespace qpid {
+namespace broker {
+
+class Connection;
+
+/**
+ * Protocol adapter class for the broker.
+ */
+class BrokerAdapter : public qpid::framing::AMQP_ServerOperations 
+{
+  public:
+    BrokerAdapter(Connection& connection);
+    AccessHandler* getAccessHandler();
+    BasicHandler* getBasicHandler();
+    ChannelHandler* getChannelHandler();
+    ConnectionHandler* getConnectionHandler();
+    DtxHandler* getDtxHandler();
+    ExchangeHandler* getExchangeHandler();
+    FileHandler* getFileHandler();
+    MessageHandler* getMessageHandler();
+    QueueHandler* getQueueHandler();
+    StreamHandler* getStreamHandler();
+    TunnelHandler* getTunnelHandler();
+    TxHandler* getTxHandler();
+
+  private:
+
+    class ConnectionHandlerImpl : public ConnectionHandler{
+        Connection& connection;
+      public:
+        ConnectionHandlerImpl(Connection& c) : connection(c) {}
+
+        void startOk(u_int16_t channel,
+                     const qpid::framing::FieldTable& clientProperties,
+                     const std::string& mechanism, const std::string& response,
+                     const std::string& locale); 
+        void secureOk(u_int16_t channel, const std::string& response); 
+        void tuneOk(u_int16_t channel, u_int16_t channelMax,
+                    u_int32_t frameMax, u_int16_t heartbeat); 
+        void open(u_int16_t channel, const std::string& virtualHost,
+                  const std::string& capabilities, bool insist); 
+        void close(u_int16_t channel, u_int16_t replyCode,
+                   const std::string& replyText,
+                   u_int16_t classId, u_int16_t methodId); 
+        void closeOk(u_int16_t channel); 
+    };
+
+    class ChannelHandlerImpl : public ChannelHandler{
+        Connection& connection;
+      public:
+        ChannelHandlerImpl(Connection& c) : connection(c) {}
+        void open(u_int16_t channel, const std::string& outOfBand); 
+        void flow(u_int16_t channel, bool active); 
+        void flowOk(u_int16_t channel, bool active); 
+        void ok( u_int16_t channel );
+        void ping( u_int16_t channel );
+        void pong( u_int16_t channel );
+        void resume( u_int16_t channel, const std::string& channelId );
+        void close(u_int16_t channel, u_int16_t replyCode, const
+                   std::string& replyText, u_int16_t classId, u_int16_t methodId); 
+        void closeOk(u_int16_t channel); 
+    };
+    
+    class ExchangeHandlerImpl : public ExchangeHandler{
+        Connection& connection;
+      public:
+        ExchangeHandlerImpl(Connection& c) : connection(c) {}
+        void declare(u_int16_t channel, u_int16_t ticket,
+                     const std::string& exchange, const std::string& type, 
+                     bool passive, bool durable, bool autoDelete,
+                     bool internal, bool nowait, 
+                     const qpid::framing::FieldTable& arguments); 
+        void delete_(u_int16_t channel, u_int16_t ticket,
+                     const std::string& exchange, bool ifUnused, bool nowait); 
+        void unbind(u_int16_t channel,
+                    u_int16_t ticket, const std::string& queue,
+                    const std::string& exchange, const std::string& routingKey,
+                    const qpid::framing::FieldTable& arguments );
+    };
+
+    class QueueHandlerImpl : public QueueHandler{
+        Connection& connection;
+      public:
+        QueueHandlerImpl(Connection& c) : connection(c) {}
+        void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
+                     bool passive, bool durable, bool exclusive, 
+                     bool autoDelete, bool nowait,
+                     const qpid::framing::FieldTable& arguments); 
+        void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
+                  const std::string& exchange, const std::string& routingKey,
+                  bool nowait, const qpid::framing::FieldTable& arguments); 
+        void unbind(u_int16_t channel,
+                    u_int16_t ticket,
+                    const std::string& queue,
+                    const std::string& exchange,
+                    const std::string& routingKey,
+                    const qpid::framing::FieldTable& arguments );
+        void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue, 
+                   bool nowait); 
+        void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+                     bool ifUnused, bool ifEmpty, 
+                     bool nowait); 
+    };
+
+    class BasicHandlerImpl : public BasicHandler{
+        Connection& connection;
+      public:
+        BasicHandlerImpl(Connection& c) : connection(c) {}
+        void qos(u_int16_t channel, u_int32_t prefetchSize,
+                 u_int16_t prefetchCount, bool global); 
+        void consume(
+            u_int16_t channel, u_int16_t ticket, const std::string& queue,
+            const std::string& consumerTag, bool noLocal, bool noAck,
+            bool exclusive, bool nowait,
+            const qpid::framing::FieldTable& fields); 
+        void cancel(u_int16_t channel, const std::string& consumerTag,
+                    bool nowait); 
+        void publish(u_int16_t channel, u_int16_t ticket,
+                     const std::string& exchange, const std::string& routingKey, 
+                     bool mandatory, bool immediate); 
+        void get(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+                 bool noAck); 
+        void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); 
+        void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); 
+        void recover(u_int16_t channel, bool requeue); 
+    };
+
+    class TxHandlerImpl : public TxHandler{
+        Connection& connection;
+      public:
+        TxHandlerImpl(Connection& c) : connection(c) {}
+        void select(u_int16_t channel);
+        void commit(u_int16_t channel);
+        void rollback(u_int16_t channel);
+    };
+
+    class MessageHandlerImpl : public MessageHandler {
+        Connection& connection;
+      public:
+        MessageHandlerImpl(Connection& c) : connection(c) {}
+
+        void append( u_int16_t channel,
+                     const std::string& reference,
+                     const std::string& bytes );
+
+        void cancel( u_int16_t channel,
+                     const std::string& destination );
+
+        void checkpoint( u_int16_t channel,
+                         const std::string& reference,
+                         const std::string& identifier );
+
+        void close( u_int16_t channel,
+                    const std::string& reference );
+
+        void consume( u_int16_t channel,
+                      u_int16_t ticket,
+                      const std::string& queue,
+                      const std::string& destination,
+                      bool noLocal,
+                      bool noAck,
+                      bool exclusive,
+                      const qpid::framing::FieldTable& filter );
+
+        void empty( u_int16_t channel );
+
+        void get( u_int16_t channel,
+                  u_int16_t ticket,
+                  const std::string& queue,
+                  const std::string& destination,
+                  bool noAck );
+
+        void offset( u_int16_t channel,
+                     u_int64_t value );
+
+        void ok( u_int16_t channel );
+
+        void open( u_int16_t channel,
+                   const std::string& reference );
+
+        void qos( u_int16_t channel,
+                  u_int32_t prefetchSize,
+                  u_int16_t prefetchCount,
+                  bool global );
+
+        void recover( u_int16_t channel,
+                      bool requeue );
+
+        void reject( u_int16_t channel,
+                     u_int16_t code,
+                     const std::string& text );
+
+        void resume( u_int16_t channel,
+                     const std::string& reference,
+                     const std::string& identifier );
+
+        void transfer( u_int16_t channel,
+                       u_int16_t ticket,
+                       const std::string& destination,
+                       bool redelivered,
+                       bool immediate,
+                       u_int64_t ttl,
+                       u_int8_t priority,
+                       u_int64_t timestamp,
+                       u_int8_t deliveryMode,
+                       u_int64_t expiration,
+                       const std::string& exchange,
+                       const std::string& routingKey,
+                       const std::string& messageId,
+                       const std::string& correlationId,
+                       const std::string& replyTo,
+                       const std::string& contentType,
+                       const std::string& contentEncoding,
+                       const std::string& userId,
+                       const std::string& appId,
+                       const std::string& transactionId,
+                       const std::string& securityToken,
+                       const qpid::framing::FieldTable& applicationHeaders,
+                       qpid::framing::Content body );
+    };
+
+    Connection& connection;
+
+    BasicHandlerImpl basicHandler;
+    ChannelHandlerImpl channelHandler;
+    ConnectionHandlerImpl connectionHandler;
+    ExchangeHandlerImpl exchangeHandler;
+    MessageHandlerImpl messageHandler;
+    QueueHandlerImpl queueHandler;
+    TxHandlerImpl txHandler;
+};
+  
+
+}} // namespace qpid::broker
+
+
+
+#endif  /*!_broker_BrokerAdapter_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Jan 16 19:49:35 2007
@@ -42,12 +42,15 @@
     tagGenerator("sgen"),
     store(_store),
     messageBuilder(this, _store, _stagingThreshold),
-    version(_version){
+    version(_version),
+    isClosed(false)
+{
 
     outstanding.reset();
 }
 
 Channel::~Channel(){
+    close();
 }
 
 bool Channel::exists(const string& consumerTag){
@@ -83,12 +86,13 @@
 }
 
 void Channel::close(){
-    //cancel all consumers
-    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
-        cancel(i);
+    if (!isClosed) {
+        isClosed = true;
+        while (!consumers.empty()) 
+            cancel(consumers.begin());
+        //requeue:
+        recover(true);
     }
-    //requeue:
-    recover(true);
 }
 
 void Channel::begin(){

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Tue Jan 16 19:49:35 2007
@@ -90,6 +90,7 @@
             MessageBuilder messageBuilder;//builder for in-progress message
             Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
 	    qpid::framing::ProtocolVersion version; // version used for this channel
+            bool isClosed;
 
             virtual void complete(Message::shared_ptr& msg);
             void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);            

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Tue Jan 16 19:49:35 2007
@@ -21,11 +21,9 @@
 #include <iostream>
 #include <assert.h>
 
-#include "Connection.h" 
-
-#include "FanOutExchange.h"
-#include "HeadersExchange.h"
+#include "Connection.h"
 
+// TODO aconway 2007-01-16: move to channel.
 #include "Requester.h"
 #include "Responder.h"
 
@@ -37,50 +35,24 @@
 namespace qpid {
 namespace broker {
 
-Connection::Connection(
-    SessionContext* _context, Broker& broker) :
-
-    context(_context), 
-    client(0),
-    queues(broker.getQueues()), 
-    exchanges(broker.getExchanges()),
-    cleaner(broker.getCleaner()),
-    settings(broker.getTimeout(), broker.getStagingThreshold()),
+Connection::Connection(SessionContext* context_, Broker& broker_) :
+    adapter(*this),
     requester(broker.getRequester()),
     responder(broker.getResponder()),
-    basicHandler(new BasicHandlerImpl(this)),
-    channelHandler(new ChannelHandlerImpl(this)),
-    connectionHandler(new ConnectionHandlerImpl(this)),
-    exchangeHandler(new ExchangeHandlerImpl(this)),
-    queueHandler(new QueueHandlerImpl(this)),
-    txHandler(new TxHandlerImpl(this)),
-    messageHandler(new MessageHandlerImpl(this)),
+    context(context_), 
     framemax(65536), 
-    heartbeat(0)
+    heartbeat(0),
+    broker(broker_),
+    settings(broker.getTimeout(), broker.getStagingThreshold())
 {}
 
-Connection::~Connection(){
-
-    if (client != NULL)
-    	delete client;
-
-}
-
-Channel* Connection::getChannel(u_int16_t channel){
-    channel_iterator i = channels.find(channel);
-    if(i == channels.end()){
-        throw ConnectionException(504, "Unknown channel: " + channel);
-    }
-    return i->second;
-}
-
 Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
     Queue::shared_ptr queue;
     if (name.empty()) {
-        queue = getChannel(channel)->getDefaultQueue();
+        queue = getChannel(channel).getDefaultQueue();
         if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
     } else {
-        queue = queues.find(name);
+        queue = broker.getQueues().find(name);
         if (queue == 0) {
             throw ChannelException( 404, "Queue not found: " + name);
         }
@@ -90,7 +62,7 @@
 
 
 Exchange::shared_ptr Connection::findExchange(const string& name){
-    return exchanges.get(name);
+    return broker.getExchanges().get(name);
 }
 
 void Connection::handleMethod(
@@ -99,10 +71,9 @@
     AMQMethodBody::shared_ptr method =
         shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
     try{
-        method->invoke(*this, channel);
+        method->invoke(adapter, channel);
     }catch(ChannelException& e){
-        channels[channel]->close();
-        channels.erase(channel);
+        closeChannel(channel);
         client->getChannel().close(
             channel, e.code, e.toString(),
             method->amqpClassId(), method->amqpMethodId());
@@ -172,529 +143,78 @@
     out.send(frame);
 }
 
-void Connection::initiated(qpid::framing::ProtocolInitiation* header){
-
-    if (client == 0)
-    {
-    	client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
-
-
-	std::cout << "---------------" << this << std::endl;
-	  
-    	//send connection start
-    	FieldTable properties;
-    	string mechanisms("PLAIN");
-    	string locales("en_US");    // channel, majour, minor
-      	client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales);
-    }
+void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
+    if (client.get())
+        // TODO aconway 2007-01-16: correct code.
+        throw ConnectionException(0, "Connection initiated twice");
+
+    client.reset(new qpid::framing::AMQP_ClientProxy(
+                     context, header->getMajor(), header->getMinor()));
+    FieldTable properties;
+    string mechanisms("PLAIN");
+    string locales("en_US");
+    // TODO aconway 2007-01-16: Move to adapter.
+    client->getConnection().start(
+        0, header->getMajor(), header->getMinor(), properties,
+        mechanisms, locales);
 }
 
 
-void Connection::idleOut(){
-
-}
+void Connection::idleOut(){}
 
-void Connection::idleIn(){
-
-}
+void Connection::idleIn(){}
 
 void Connection::closed(){
     try {
-        for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
-            Channel* c = i->second;
-            channels.erase(i);
-            c->close();
-            delete c;
-        }
-        for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
-            string name = (*i)->getName();
-            queues.destroy(name);
-            exclusiveQueues.erase(i);
+        while (!exclusiveQueues.empty()) {
+            broker.getQueues().destroy(exclusiveQueues.front()->getName());
+            exclusiveQueues.erase(exclusiveQueues.begin());
         }
     } catch(std::exception& e) {
-        std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
+        std::cout << "Caught unhandled exception while closing session: " <<
+            e.what() << std::endl;
+        assert(0);
     }
 }
 
+// TODO aconway 2007-01-16: colapse these. 
 void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
-    getChannel(channel)->handleHeader(body);
+    getChannel(channel).handleHeader(body);
 }
 
 void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
-    getChannel(channel)->handleContent(body);
+    getChannel(channel).handleContent(body);
 }
 
 void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
     std::cout << "Connection::handleHeartbeat()" << std::endl;
 }
-        
-void Connection::ConnectionHandlerImpl::startOk(
-    u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, 
-    const string& /*response*/, const string& /*locale*/){
-    parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
-}
-        
-void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
-        
-void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
-    parent->framemax = framemax;
-    parent->heartbeat = heartbeat;
-}
-        
-void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
-    string knownhosts;
-    parent->client->getConnection().openOk(0, knownhosts);
-}
-        
-void Connection::ConnectionHandlerImpl::close(
-    u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, 
-    u_int16_t /*classId*/, u_int16_t /*methodId*/)
-{
-    parent->client->getConnection().closeOk(0);
-    parent->context->close();
-} 
-        
-void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
-    parent->context->close();
-} 
-              
-
-
-void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
-
-    
-    parent->channels[channel] = new Channel(
-        parent->client->getProtocolVersion() , parent->context, channel,
-        parent->framemax, parent->queues.getStore(),
-        parent->settings.stagingThreshold);
-
-    // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
-    parent->client->getChannel().openOk(channel, std::string()/* ID */);
-} 
-        
-void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}         
-void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} 
-        
-void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, 
-                                                   u_int16_t /*classId*/, u_int16_t /*methodId*/){
-    Channel* c = parent->getChannel(channel);
-    if(c){
-        parent->channels.erase(channel);
-        c->close();
-        delete c;
-        parent->client->getChannel().closeOk(channel);
-    }
-} 
-        
-void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} 
-              
-
-
-void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, 
-                                                      bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
-                                                      const FieldTable& /*arguments*/){
-
-    if(passive){
-        if(!parent->exchanges.get(exchange)){
-            throw ChannelException(404, "Exchange not found: " + exchange);            
-        }
-    }else{        
-        try{
-            std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type);
-            if(!response.second && response.first->getType() != type){
-                throw ConnectionException(507, "Exchange already declared to be of type " 
-                                          + response.first->getType() + ", requested " + type);
-            }
-        }catch(UnknownExchangeTypeException& e){
-            throw ConnectionException(503, "Exchange type not implemented: " + type);
-        }
-    }
-    if(!nowait){
-        parent->client->getExchange().declareOk(channel);
-    }
-}
-
-                
-void Connection::ExchangeHandlerImpl::unbind(
-    u_int16_t /*channel*/,
-    u_int16_t /*ticket*/,
-    const string& /*queue*/,
-    const string& /*exchange*/,
-    const string& /*routingKey*/,
-    const qpid::framing::FieldTable& /*arguments*/ )
-{
-    assert(0);            // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-
-                
-void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, 
-                                                      const string& exchange, bool /*ifUnused*/, bool nowait){
-
-    //TODO: implement unused
-    parent->exchanges.destroy(exchange);
-    if(!nowait) parent->client->getExchange().deleteOk(channel);
-} 
-
-void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, 
-                                                   bool passive, bool durable, bool exclusive, 
-                                                   bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
-    Queue::shared_ptr queue;
-    if (passive && !name.empty()) {
-	queue = parent->getQueue(name, channel);
-    } else {
-	std::pair<Queue::shared_ptr, bool> queue_created =  
-            parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
-	queue = queue_created.first;
-	assert(queue);
-	if (queue_created.second) { // This is a new queue
-	    parent->getChannel(channel)->setDefaultQueue(queue);
-
-            //apply settings & create persistent record if required
-            queue_created.first->create(arguments);
-
-	    //add default binding:
-	    parent->exchanges.getDefault()->bind(queue, name, 0);
-	    if (exclusive) {
-		parent->exclusiveQueues.push_back(queue);
-	    } else if(autoDelete){
-		parent->cleaner.add(queue);
-	    }
-	}
-    }
-    if (exclusive && !queue->isExclusiveOwner(parent)) {
-	throw ChannelException(405, "Cannot grant exclusive access to queue");
-    }
-    if (!nowait) {
-        string queueName = queue->getName();
-        parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
-    }
-} 
-        
-void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, 
-                                                const string& exchangeName, const string& routingKey, bool nowait, 
-                                                const FieldTable& arguments){
-
-    Queue::shared_ptr queue = parent->getQueue(queueName, channel);
-    Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
-    if(exchange){
-        // kpvdr - cannot use this any longer as routingKey is now const
-        //        if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
-        //        exchange->bind(queue, routingKey, &arguments);
-        string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
-        exchange->bind(queue, exchangeRoutingKey, &arguments);
-        if(!nowait) parent->client->getQueue().bindOk(channel);    
-    }else{
-        throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
-    }
-} 
-        
-void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
-
-    Queue::shared_ptr queue = parent->getQueue(queueName, channel);
-    int count = queue->purge();
-    if(!nowait) parent->client->getQueue().purgeOk(channel, count);
-} 
-        
-void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, 
-                                                   bool ifUnused, bool ifEmpty, bool nowait){
-    ChannelException error(0, "");
-    int count(0);
-    Queue::shared_ptr q = parent->getQueue(queue, channel);
-    if(ifEmpty && q->getMessageCount() > 0){
-        throw ChannelException(406, "Queue not empty.");
-    }else if(ifUnused && q->getConsumerCount() > 0){
-        throw ChannelException(406, "Queue in use.");
-    }else{
-        //remove the queue from the list of exclusive queues if necessary
-        if(q->isExclusiveOwner(parent)){
-            queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
-            if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
-        }
-        count = q->getMessageCount();
-        q->destroy();
-        parent->queues.destroy(queue);
-    }
-
-    if(!nowait) parent->client->getQueue().deleteOk(channel, count);
-} 
-              
-        
-
-
-void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
-    //TODO: handle global
-    parent->getChannel(channel)->setPrefetchSize(prefetchSize);
-    parent->getChannel(channel)->setPrefetchCount(prefetchCount);
-    parent->client->getBasic().qosOk(channel);
-} 
-        
-void Connection::BasicHandlerImpl::consume(
-    u_int16_t channelId, u_int16_t /*ticket*/, 
-    const string& queueName, const string& consumerTag, 
-    bool noLocal, bool noAck, bool exclusive, 
-    bool nowait, const FieldTable& fields)
-{
-    
-    Queue::shared_ptr queue = parent->getQueue(queueName, channelId);    
-    Channel* channel = parent->channels[channelId];
-    if(!consumerTag.empty() && channel->exists(consumerTag)){
-        throw ConnectionException(530, "Consumer tags must be unique");
-    }
-
-    try{
-        string newTag = consumerTag;
-        channel->consume(
-            newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields);
-
-        if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag);
-
-        //allow messages to be dispatched if required as there is now a consumer:
-        queue->dispatch();
-    }catch(ExclusiveAccessException& e){
-        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
-        else throw ChannelException(403, "Access would violate previously granted exclusivity");
-    }
-
-} 
-        
-void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
-    parent->getChannel(channel)->cancel(consumerTag);
-
-    if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag);
-} 
-        
-void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, 
-                                                   const string& exchangeName, const string& routingKey, 
-                                                   bool mandatory, bool immediate){
-
-    Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName);
-    if(exchange){
-        Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
-        parent->getChannel(channel)->handlePublish(msg, exchange);
-    }else{
-        throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
-    }
-} 
-        
-void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
-    Queue::shared_ptr queue = parent->getQueue(queueName, channelId);    
-    if(!parent->getChannel(channelId)->get(queue, !noAck)){
-        string clusterId;//not used, part of an imatix hack
-
-        parent->client->getBasic().getEmpty(channelId, clusterId);
-    }
-} 
-        
-void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
-    try{
-        parent->getChannel(channel)->ack(deliveryTag, multiple);
-    }catch(InvalidAckException& e){
-        throw ConnectionException(530, "Received ack for unrecognised delivery tag");
-    }
-} 
-        
-void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} 
-        
-void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
-    parent->getChannel(channel)->recover(requeue);
-} 
-
-void Connection::TxHandlerImpl::select(u_int16_t channel){
-    parent->getChannel(channel)->begin();
-    parent->client->getTx().selectOk(channel);
-}
-
-void Connection::TxHandlerImpl::commit(u_int16_t channel){
-    parent->getChannel(channel)->commit();
-    parent->client->getTx().commitOk(channel);
-}
-
-void Connection::TxHandlerImpl::rollback(u_int16_t channel){
-    
-    parent->getChannel(channel)->rollback();
-    parent->client->getTx().rollbackOk(channel);
-    parent->getChannel(channel)->recover(false);    
-}
-              
-void
-Connection::QueueHandlerImpl::unbind(
-    u_int16_t /*channel*/,
-    u_int16_t /*ticket*/,
-    const string& /*queue*/,
-    const string& /*exchange*/,
-    const string& /*routingKey*/,
-    const qpid::framing::FieldTable& /*arguments*/ )
-{
-    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-void
-Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
-{
-    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-void
-Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
-{
-    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-void
-Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
-{
-    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-void
-Connection::ChannelHandlerImpl::resume(
-    u_int16_t /*channel*/,
-    const string& /*channelId*/ )
-{
-    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-// Message class method handlers
-void
-Connection::MessageHandlerImpl::append( u_int16_t /*channel*/,
-                                                const string& /*reference*/,
-                                                const string& /*bytes*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-
-void
-Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
-                                                const string& /*destination*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
 
-void
-Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
-                                                    const string& /*reference*/,
-                                                    const string& /*identifier*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+void Connection::openChannel(u_int16_t channel) {
+    if (channel == 0)
+        throw ConnectionException(504, "Illegal channel 0");
+    if (channels.find(channel) != channels.end())
+        throw ConnectionException(504, "Channel already open: " + channel);
+    channels.insert(
+        channel,
+        new Channel(
+            client->getProtocolVersion(), context, channel, framemax,
+            broker.getQueues().getStore(), settings.stagingThreshold));
+}
+
+void Connection::closeChannel(u_int16_t channel) {
+    getChannel(channel).close(); // throws if channel does not exist.
+    channels.erase(channels.find(channel));
 }
 
-void
-Connection::MessageHandlerImpl::close( u_int16_t /*channel*/,
-                                               const string& /*reference*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/,
-                                                 u_int16_t /*ticket*/,
-                                                 const string& /*queue*/,
-                                                 const string& /*destination*/,
-                                                 bool /*noLocal*/,
-                                                 bool /*noAck*/,
-                                                 bool /*exclusive*/,
-                                                 const qpid::framing::FieldTable& /*filter*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
 
-void
-Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::get( u_int16_t /*channel*/,
-                                             u_int16_t /*ticket*/,
-                                             const string& /*queue*/,
-                                             const string& /*destination*/,
-                                             bool /*noAck*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/,
-                                                u_int64_t /*value*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::open( u_int16_t /*channel*/,
-                                              const string& /*reference*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/,
-                                             u_int32_t /*prefetchSize*/,
-                                             u_int16_t /*prefetchCount*/,
-                                             bool /*global*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/,
-                                                 bool /*requeue*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/,
-                                                u_int16_t /*code*/,
-                                                const string& /*text*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+Channel& Connection::getChannel(u_int16_t channel){
+    ChannelMap::iterator i = channels.find(channel);
+    if(i == channels.end())
+        throw ConnectionException(504, "Unknown channel: " + channel);
+    return *i;
 }
 
-void
-Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/,
-                                                const string& /*reference*/,
-                                                const string& /*identifier*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
 
-void
-Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
-                                                  u_int16_t /*ticket*/,
-                                                  const string& /*destination*/,
-                                                  bool /*redelivered*/,
-                                                  bool /*immediate*/,
-                                                  u_int64_t /*ttl*/,
-                                                  u_int8_t /*priority*/,
-                                                  u_int64_t /*timestamp*/,
-                                                  u_int8_t /*deliveryMode*/,
-                                                  u_int64_t /*expiration*/,
-                                                  const string& /*exchange*/,
-                                                  const string& /*routingKey*/,
-                                                  const string& /*messageId*/,
-                                                  const string& /*correlationId*/,
-                                                  const string& /*replyTo*/,
-                                                  const string& /*contentType*/,
-                                                  const string& /*contentEncoding*/,
-                                                  const string& /*userId*/,
-                                                  const string& /*appId*/,
-                                                  const string& /*transactionId*/,
-                                                  const string& /*securityToken*/,
-                                                  const qpid::framing::FieldTable& /*applicationHeaders*/,
-                                                  qpid::framing::Content /*body*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
- 
 }}
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h Tue Jan 16 19:49:35 2007
@@ -21,10 +21,11 @@
 #ifndef _Connection_
 #define _Connection_
 
-#include <map>
 #include <sstream>
 #include <vector>
 
+#include <boost/ptr_container/ptr_map.hpp>
+
 #include <AMQFrame.h>
 #include <AMQP_ClientProxy.h>
 #include <AMQP_ServerOperations.h>
@@ -32,6 +33,7 @@
 #include <sys/ConnectionInputHandler.h>
 #include <sys/TimeoutHandler.h>
 #include "Broker.h"
+#include "BrokerAdapter.h"
 #include "Exception.h"
 
 namespace qpid {
@@ -46,11 +48,13 @@
 };
 
 class Connection : public qpid::sys::ConnectionInputHandler, 
-                           public qpid::framing::AMQP_ServerOperations, 
-                           public ConnectionToken
+                   public ConnectionToken
 {
-    typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+    typedef boost::ptr_map<u_int16_t, Channel> ChannelMap;
+
+    // TODO aconway 2007-01-16: belongs on broker.
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
     class Sender : public qpid::framing::OutputHandler {
       public:
         Sender(qpid::framing::OutputHandler&,
@@ -61,35 +65,44 @@
         qpid::framing::Requester& requester;
         qpid::framing::Responder& responder;
     };
-    
-    qpid::sys::SessionContext* context;
-    qpid::framing::AMQP_ClientProxy* client;
-    QueueRegistry& queues;
-    ExchangeRegistry& exchanges;
-    AutoDelete& cleaner;
-    Settings settings;
+
+    BrokerAdapter adapter;
+    // FIXME aconway 2007-01-16: On Channel
     qpid::framing::Requester& requester;
     qpid::framing::Responder& responder;
-    std::auto_ptr<BasicHandler> basicHandler;
-    std::auto_ptr<ChannelHandler> channelHandler;
-    std::auto_ptr<ConnectionHandler> connectionHandler;
-    std::auto_ptr<ExchangeHandler> exchangeHandler;
-    std::auto_ptr<QueueHandler> queueHandler;
-    std::auto_ptr<TxHandler> txHandler;
-    std::auto_ptr<MessageHandler> messageHandler;
+    ChannelMap channels;
 
-    std::map<u_int16_t, Channel*> channels;
-    std::vector<Queue::shared_ptr> exclusiveQueues;
+    void handleHeader(u_int16_t channel,
+                      qpid::framing::AMQHeaderBody::shared_ptr body);
+    void handleContent(u_int16_t channel,
+                       qpid::framing::AMQContentBody::shared_ptr body);
+    void handleMethod(u_int16_t channel,
+                      qpid::framing::AMQBody::shared_ptr body);
+    void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
 
+    // FIXME aconway 2007-01-16: on broker.
+    Exchange::shared_ptr findExchange(const string& name);
+    
+  public:
+    Connection(qpid::sys::SessionContext* context, Broker& broker);
+    // ConnectionInputHandler methods
+    void received(qpid::framing::AMQFrame* frame);
+    void initiated(qpid::framing::ProtocolInitiation* header);
+    void idleOut();
+    void idleIn();
+    void closed();
+
+    // FIXME aconway 2007-01-16: encapsulate.
+    qpid::sys::SessionContext* context;
     u_int32_t framemax;
     u_int16_t heartbeat;
+    Broker& broker;
+    std::auto_ptr<qpid::framing::AMQP_ClientProxy> client;
+    Settings settings;
+    // FIXME aconway 2007-01-16: Belongs on broker?
+    std::vector<Queue::shared_ptr> exclusiveQueues;
 
-    void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
-    void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
-    void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr body);
-    void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
-
-    Channel* getChannel(u_int16_t channel);
+    // FIXME aconway 2007-01-16: move to broker.
     /**
      * Get named queue, never returns 0.
      * @return: named queue or default queue for channel if name=""
@@ -98,261 +111,10 @@
      */
     Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
 
-    Exchange::shared_ptr findExchange(const string& name);
-    
-  public:
-    Connection(qpid::sys::SessionContext* context, Broker& broker);
-    virtual void received(qpid::framing::AMQFrame* frame);
-    virtual void initiated(qpid::framing::ProtocolInitiation* header);
-    virtual void idleOut();
-    virtual void idleIn();
-    virtual void closed();
-    virtual ~Connection();
-
-    class ConnectionHandlerImpl : public ConnectionHandler{
-        Connection* parent;
-      public:
-        inline ConnectionHandlerImpl(Connection* _parent) : parent(_parent) {}
-
-        virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, 
-                             const string& response, const string& locale); 
-                
-        virtual void secureOk(u_int16_t channel, const string& response); 
-                
-        virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); 
-                
-        virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); 
-                
-        virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, 
-                           u_int16_t methodId); 
- 
-        virtual void closeOk(u_int16_t channel); 
-                
-        virtual ~ConnectionHandlerImpl(){}
-    };
-    
-    class ChannelHandlerImpl : public ChannelHandler{
-        Connection* parent;
-      public:
-        inline ChannelHandlerImpl(Connection* _parent) : parent(_parent) {}
-        
-        virtual void open(u_int16_t channel, const string& outOfBand); 
-        
-        virtual void flow(u_int16_t channel, bool active); 
-                
-        virtual void flowOk(u_int16_t channel, bool active); 
-                
-        virtual void ok( u_int16_t channel );
-
-        virtual void ping( u_int16_t channel );
-
-        virtual void pong( u_int16_t channel );
-
-        virtual void resume( u_int16_t channel,
-                            const string& channelId );
-        
-        virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, 
-                           u_int16_t classId, u_int16_t methodId); 
-                
-        virtual void closeOk(u_int16_t channel); 
-                
-        virtual ~ChannelHandlerImpl(){}
-    };
-    
-    class ExchangeHandlerImpl : public ExchangeHandler{
-        Connection* parent;
-      public:
-        inline ExchangeHandlerImpl(Connection* _parent) : parent(_parent) {}
-        
-        virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, 
-                             bool passive, bool durable, bool autoDelete, bool internal, bool nowait, 
-                             const qpid::framing::FieldTable& arguments); 
-                
-        virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); 
-                
-        virtual void unbind(u_int16_t channel,
-                            u_int16_t ticket,
-                            const string& queue,
-                            const string& exchange,
-                            const string& routingKey,
-                            const qpid::framing::FieldTable& arguments );
-
-        virtual ~ExchangeHandlerImpl(){}
-    };
-
-    
-    class QueueHandlerImpl : public QueueHandler{
-        Connection* parent;
-      public:
-        inline QueueHandlerImpl(Connection* _parent) : parent(_parent) {}
-        
-        virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, 
-                             bool passive, bool durable, bool exclusive, 
-                             bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); 
-                
-        virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue, 
-                          const string& exchange, const string& routingKey, bool nowait, 
-                          const qpid::framing::FieldTable& arguments); 
-
-        virtual void unbind(u_int16_t channel,
-                            u_int16_t ticket,
-                            const string& queue,
-                            const string& exchange,
-                            const string& routingKey,
-                            const qpid::framing::FieldTable& arguments );
-
-        virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, 
-                           bool nowait); 
-                
-        virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, 
-                             bool nowait); 
-
-        virtual ~QueueHandlerImpl(){}
-    };
-
-    class BasicHandlerImpl : public BasicHandler{
-        Connection* parent;
-      public:
-        inline BasicHandlerImpl(Connection* _parent) : parent(_parent) {}
-        
-        virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); 
-
-        virtual void consume(
-            u_int16_t channel, u_int16_t ticket, const string& queue,
-            const string& consumerTag, bool noLocal, bool noAck,
-            bool exclusive, bool nowait,
-            const qpid::framing::FieldTable& fields); 
-        
-        virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait); 
-                
-        virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey, 
-                             bool mandatory, bool immediate); 
-                
-        virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck); 
-                
-        virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); 
-                
-        virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); 
-                
-        virtual void recover(u_int16_t channel, bool requeue); 
-                
-        virtual ~BasicHandlerImpl(){}
-    };
-
-    class TxHandlerImpl : public TxHandler{
-        Connection* parent;
-      public:
-        TxHandlerImpl(Connection* _parent) : parent(_parent) {}
-        virtual ~TxHandlerImpl() {}
-        virtual void select(u_int16_t channel);
-        virtual void commit(u_int16_t channel);
-        virtual void rollback(u_int16_t channel);
-    };
-
-    class MessageHandlerImpl : public MessageHandler {
-        Connection* parent;
-
-        // Constructors and destructors
-
-      public:
-        MessageHandlerImpl() {}
-        MessageHandlerImpl(Connection* _parent) : parent(_parent) {}
-        virtual ~MessageHandlerImpl() {}
-
-        // Protocol methods
-        virtual void append( u_int16_t channel,
-                            const string& reference,
-                            const string& bytes );
-
-        virtual void cancel( u_int16_t channel,
-                            const string& destination );
-
-        virtual void checkpoint( u_int16_t channel,
-                            const string& reference,
-                            const string& identifier );
-
-        virtual void close( u_int16_t channel,
-                            const string& reference );
-
-        virtual void consume( u_int16_t channel,
-                            u_int16_t ticket,
-                            const string& queue,
-                            const string& destination,
-                            bool noLocal,
-                            bool noAck,
-                            bool exclusive,
-                            const qpid::framing::FieldTable& filter );
-
-        virtual void empty( u_int16_t channel );
-
-        virtual void get( u_int16_t channel,
-                            u_int16_t ticket,
-                            const string& queue,
-                            const string& destination,
-                            bool noAck );
-
-        virtual void offset( u_int16_t channel,
-                            u_int64_t value );
-
-        virtual void ok( u_int16_t channel );
-
-        virtual void open( u_int16_t channel,
-                            const string& reference );
-
-        virtual void qos( u_int16_t channel,
-                            u_int32_t prefetchSize,
-                            u_int16_t prefetchCount,
-                            bool global );
-
-        virtual void recover( u_int16_t channel,
-                            bool requeue );
-
-        virtual void reject( u_int16_t channel,
-                            u_int16_t code,
-                            const string& text );
-
-        virtual void resume( u_int16_t channel,
-                            const string& reference,
-                            const string& identifier );
-
-        virtual void transfer( u_int16_t channel,
-                            u_int16_t ticket,
-                            const string& destination,
-                            bool redelivered,
-                            bool immediate,
-                            u_int64_t ttl,
-                            u_int8_t priority,
-                            u_int64_t timestamp,
-                            u_int8_t deliveryMode,
-                            u_int64_t expiration,
-                            const string& exchange,
-                            const string& routingKey,
-                            const string& messageId,
-                            const string& correlationId,
-                            const string& replyTo,
-                            const string& contentType,
-                            const string& contentEncoding,
-                            const string& userId,
-                            const string& appId,
-                            const string& transactionId,
-                            const string& securityToken,
-                            const qpid::framing::FieldTable& applicationHeaders,
-                            qpid::framing::Content body );
-    };
 
-    virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
-    virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
-    virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
-    virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
-    virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
-    virtual TxHandler* getTxHandler(){ return txHandler.get(); }       
-    virtual MessageHandler* getMessageHandler(){ return messageHandler.get(); } 
- 
-    virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }       
-    virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }       
-    virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }       
-    virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }       
-    virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } 
+    void openChannel(u_int16_t channel);
+    void closeChannel(u_int16_t channel);
+    Channel& getChannel(u_int16_t channel);
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am Tue Jan 16 19:49:35 2007
@@ -65,10 +65,12 @@
   QueueRegistry.h				\
   RecoveryManager.cpp				\
   RecoveryManager.h				\
-  ConnectionFactory.cpp			\
-  ConnectionFactory.h			\
-  Connection.cpp			\
-  Connection.h				\
+  ConnectionFactory.cpp				\
+  ConnectionFactory.h				\
+  Connection.cpp				\
+  Connection.h					\
+  BrokerAdapter.cpp				\
+  BrokerAdapter.h				\
   TopicExchange.cpp				\
   TopicExchange.h				\
   TransactionalStore.h				\

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h Tue Jan 16 19:49:35 2007
@@ -36,7 +36,7 @@
 namespace framing {
 
 using std::string;
-
+typedef u_int16_t ChannelId;
 typedef u_int64_t RequestId;
 typedef u_int64_t ResponseId;
 typedef u_int32_t BatchOffset;