You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/17 04:49:36 UTC
svn commit: r496926 - in /incubator/qpid/branches/qpid.0-9/cpp/lib:
broker/BrokerAdapter.cpp broker/BrokerAdapter.h broker/BrokerChannel.cpp
broker/BrokerChannel.h broker/Connection.cpp broker/Connection.h
broker/Makefile.am common/framing/amqp_types.h
Author: aconway
Date: Tue Jan 16 19:49:35 2007
New Revision: 496926
URL: http://svn.apache.org/viewvc?view=rev&rev=496926
Log:
Separated adapter code from Connection class: Extracted all
HandlerImpl classes to BrokerAdapter. The Connection is now part of the
version-invariant core, all version-dependent code is in BrokerAdapter.
The extraction exposes some ugly dependencies between adapter, Connection
and parts of the Broker. More refactoring to follow to improve encapsulation.
Added:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (with props)
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=auto&rev=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Jan 16 19:49:35 2007
@@ -0,0 +1,537 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "BrokerAdapter.h"
+#include "Connection.h"
+#include "Exception.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+BrokerAdapter::BrokerAdapter(Connection& c) :
+ connection(c),
+ basicHandler(c),
+ channelHandler(c),
+ connectionHandler(c),
+ exchangeHandler(c),
+ messageHandler(c),
+ queueHandler(c),
+ txHandler(c)
+{}
+
+typedef qpid::framing::AMQP_ServerOperations Ops;
+
+Ops::ChannelHandler* BrokerAdapter::getChannelHandler() {
+ return &channelHandler;
+}
+Ops::ConnectionHandler* BrokerAdapter::getConnectionHandler() {
+ return &connectionHandler;
+}
+Ops::BasicHandler* BrokerAdapter::getBasicHandler() {
+ return &basicHandler;
+}
+Ops::ExchangeHandler* BrokerAdapter::getExchangeHandler() {
+ return &exchangeHandler;
+}
+Ops::QueueHandler* BrokerAdapter::getQueueHandler() {
+ return &queueHandler;
+}
+Ops::TxHandler* BrokerAdapter::getTxHandler() {
+ return &txHandler;
+}
+Ops::MessageHandler* BrokerAdapter::getMessageHandler() {
+ return &messageHandler;
+}
+Ops::AccessHandler* BrokerAdapter::getAccessHandler() {
+ throw ConnectionException(540, "Access class not implemented");
+}
+Ops::FileHandler* BrokerAdapter::getFileHandler() {
+ throw ConnectionException(540, "File class not implemented");
+}
+Ops::StreamHandler* BrokerAdapter::getStreamHandler() {
+ throw ConnectionException(540, "Stream class not implemented");
+}
+Ops::DtxHandler* BrokerAdapter::getDtxHandler() {
+ throw ConnectionException(540, "Dtx class not implemented");
+}
+Ops::TunnelHandler* BrokerAdapter::getTunnelHandler() {
+ throw ConnectionException(540, "Tunnel class not implemented");
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::startOk(
+ u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const string& /*response*/, const string& /*locale*/){
+ connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat);
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+
+void BrokerAdapter::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+ connection.framemax = framemax;
+ connection.heartbeat = heartbeat;
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+ string knownhosts;
+ connection.client->getConnection().openOk(0, knownhosts);
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::close(
+ u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
+ connection.client->getConnection().closeOk(0);
+ connection.context->close();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+ connection.context->close();
+}
+
+void BrokerAdapter::ChannelHandlerImpl::open(
+ u_int16_t channel, const string& /*outOfBand*/){
+ connection.openChannel(channel);
+ // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
+ connection.client->getChannel().openOk(channel, std::string()/* ID */);
+}
+
+void BrokerAdapter::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
+
+void BrokerAdapter::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/){
+ connection.closeChannel(channel);
+ connection.client->getChannel().closeOk(channel);
+}
+
+void BrokerAdapter::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
+
+
+
+void BrokerAdapter::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
+
+ if(passive){
+ if(!connection.broker.getExchanges().get(exchange)){
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = connection.broker.getExchanges().declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(507, "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(503, "Exchange type not implemented: " + type);
+ }
+ }
+ if(!nowait){
+ connection.client->getExchange().declareOk(channel);
+ }
+}
+
+
+void BrokerAdapter::ExchangeHandlerImpl::unbind(
+ u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const qpid::framing::FieldTable& /*arguments*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+
+
+void BrokerAdapter::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
+
+ //TODO: implement unused
+ connection.broker.getExchanges().destroy(exchange);
+ if(!nowait) connection.client->getExchange().deleteOk(channel);
+}
+
+void BrokerAdapter::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = connection.getQueue(name, channel);
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ connection.broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ connection.getChannel(channel).setDefaultQueue(queue);
+
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
+
+ //add default binding:
+ connection.broker.getExchanges().getDefault()->bind(queue, name, 0);
+ if (exclusive) {
+ connection.exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ connection.broker.getCleaner().add(queue);
+ }
+ }
+ }
+ if (exclusive && !queue->isExclusiveOwner(&connection)) {
+ throw ChannelException(405, "Cannot grant exclusive access to queue");
+ }
+ if (!nowait) {
+ string queueName = queue->getName();
+ connection.client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ }
+}
+
+void BrokerAdapter::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel);
+ Exchange::shared_ptr exchange = connection.broker.getExchanges().get(exchangeName);
+ if(exchange){
+ // kpvdr - cannot use this any longer as routingKey is now const
+ // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+ // exchange->bind(queue, routingKey, &arguments);
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if(!nowait) connection.client->getQueue().bindOk(channel);
+ }else{
+ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void BrokerAdapter::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel);
+ int count = queue->purge();
+ if(!nowait) connection.client->getQueue().purgeOk(channel, count);
+}
+
+void BrokerAdapter::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = connection.getQueue(queue, channel);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(&connection)){
+ queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
+ if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ q->destroy();
+ connection.broker.getQueues().destroy(queue);
+ }
+
+ if(!nowait) connection.client->getQueue().deleteOk(channel, count);
+}
+
+
+
+
+void BrokerAdapter::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+ //TODO: handle global
+ connection.getChannel(channel).setPrefetchSize(prefetchSize);
+ connection.getChannel(channel).setPrefetchCount(prefetchCount);
+ connection.client->getBasic().qosOk(channel);
+}
+
+void BrokerAdapter::BasicHandlerImpl::consume(
+ u_int16_t channelId, u_int16_t /*ticket*/,
+ const string& queueName, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait, const FieldTable& fields)
+{
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
+ Channel& channel = connection.getChannel(channelId);
+ if(!consumerTag.empty() && channel.exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = consumerTag;
+ channel.consume(
+ newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+
+ if(!nowait) connection.client->getBasic().consumeOk(channelId, newTag);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
+
+}
+
+void BrokerAdapter::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+ connection.getChannel(channel).cancel(consumerTag);
+
+ if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag);
+}
+
+void BrokerAdapter::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate){
+
+ Exchange::shared_ptr exchange = exchangeName.empty() ? connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName);
+ if(exchange){
+ Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate);
+ connection.getChannel(channel).handlePublish(msg, exchange);
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ }
+}
+
+void BrokerAdapter::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+ Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
+ if(!connection.getChannel(channelId).get(queue, !noAck)){
+ string clusterId;//not used, part of an imatix hack
+
+ connection.client->getBasic().getEmpty(channelId, clusterId);
+ }
+}
+
+void BrokerAdapter::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+ try{
+ connection.getChannel(channel).ack(deliveryTag, multiple);
+ }catch(InvalidAckException& e){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }
+}
+
+void BrokerAdapter::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+
+void BrokerAdapter::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+ connection.getChannel(channel).recover(requeue);
+}
+
+void BrokerAdapter::TxHandlerImpl::select(u_int16_t channel){
+ connection.getChannel(channel).begin();
+ connection.client->getTx().selectOk(channel);
+}
+
+void BrokerAdapter::TxHandlerImpl::commit(u_int16_t channel){
+ connection.getChannel(channel).commit();
+ connection.client->getTx().commitOk(channel);
+}
+
+void BrokerAdapter::TxHandlerImpl::rollback(u_int16_t channel){
+
+ connection.getChannel(channel).rollback();
+ connection.client->getTx().rollbackOk(channel);
+ connection.getChannel(channel).recover(false);
+}
+
+void
+BrokerAdapter::QueueHandlerImpl::unbind(
+ u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const qpid::framing::FieldTable& /*arguments*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::resume(
+ u_int16_t /*channel*/,
+ const string& /*channelId*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+// Message class method handlers
+void
+BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*bytes*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+
+void
+BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
+ const string& /*destination*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/,
+ const string& /*reference*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::consume( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noLocal*/,
+ bool /*noAck*/,
+ bool /*exclusive*/,
+ const qpid::framing::FieldTable& /*filter*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::get( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noAck*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
+ u_int64_t /*value*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/,
+ const string& /*reference*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::qos( u_int16_t /*channel*/,
+ u_int32_t /*prefetchSize*/,
+ u_int16_t /*prefetchCount*/,
+ bool /*global*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::recover( u_int16_t /*channel*/,
+ bool /*requeue*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/,
+ u_int16_t /*code*/,
+ const string& /*text*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*destination*/,
+ bool /*redelivered*/,
+ bool /*immediate*/,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content /*body*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=auto&rev=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Tue Jan 16 19:49:35 2007
@@ -0,0 +1,261 @@
+#ifndef _broker_BrokerAdapter_h
+#define _broker_BrokerAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQP_ServerOperations.h"
+
+namespace qpid {
+namespace broker {
+
+class Connection;
+
+/**
+ * Protocol adapter class for the broker.
+ */
+class BrokerAdapter : public qpid::framing::AMQP_ServerOperations
+{
+ public:
+ BrokerAdapter(Connection& connection);
+ AccessHandler* getAccessHandler();
+ BasicHandler* getBasicHandler();
+ ChannelHandler* getChannelHandler();
+ ConnectionHandler* getConnectionHandler();
+ DtxHandler* getDtxHandler();
+ ExchangeHandler* getExchangeHandler();
+ FileHandler* getFileHandler();
+ MessageHandler* getMessageHandler();
+ QueueHandler* getQueueHandler();
+ StreamHandler* getStreamHandler();
+ TunnelHandler* getTunnelHandler();
+ TxHandler* getTxHandler();
+
+ private:
+
+ class ConnectionHandlerImpl : public ConnectionHandler{
+ Connection& connection;
+ public:
+ ConnectionHandlerImpl(Connection& c) : connection(c) {}
+
+ void startOk(u_int16_t channel,
+ const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(u_int16_t channel, const std::string& response);
+ void tuneOk(u_int16_t channel, u_int16_t channelMax,
+ u_int32_t frameMax, u_int16_t heartbeat);
+ void open(u_int16_t channel, const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(u_int16_t channel, u_int16_t replyCode,
+ const std::string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+ void closeOk(u_int16_t channel);
+ };
+
+ class ChannelHandlerImpl : public ChannelHandler{
+ Connection& connection;
+ public:
+ ChannelHandlerImpl(Connection& c) : connection(c) {}
+ void open(u_int16_t channel, const std::string& outOfBand);
+ void flow(u_int16_t channel, bool active);
+ void flowOk(u_int16_t channel, bool active);
+ void ok( u_int16_t channel );
+ void ping( u_int16_t channel );
+ void pong( u_int16_t channel );
+ void resume( u_int16_t channel, const std::string& channelId );
+ void close(u_int16_t channel, u_int16_t replyCode, const
+ std::string& replyText, u_int16_t classId, u_int16_t methodId);
+ void closeOk(u_int16_t channel);
+ };
+
+ class ExchangeHandlerImpl : public ExchangeHandler{
+ Connection& connection;
+ public:
+ ExchangeHandlerImpl(Connection& c) : connection(c) {}
+ void declare(u_int16_t channel, u_int16_t ticket,
+ const std::string& exchange, const std::string& type,
+ bool passive, bool durable, bool autoDelete,
+ bool internal, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(u_int16_t channel, u_int16_t ticket,
+ const std::string& exchange, bool ifUnused, bool nowait);
+ void unbind(u_int16_t channel,
+ u_int16_t ticket, const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments );
+ };
+
+ class QueueHandlerImpl : public QueueHandler{
+ Connection& connection;
+ public:
+ QueueHandlerImpl(Connection& c) : connection(c) {}
+ void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ bool nowait, const qpid::framing::FieldTable& arguments);
+ void unbind(u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments );
+ void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ bool nowait);
+ void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ bool ifUnused, bool ifEmpty,
+ bool nowait);
+ };
+
+ class BasicHandlerImpl : public BasicHandler{
+ Connection& connection;
+ public:
+ BasicHandlerImpl(Connection& c) : connection(c) {}
+ void qos(u_int16_t channel, u_int32_t prefetchSize,
+ u_int16_t prefetchCount, bool global);
+ void consume(
+ u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ const std::string& consumerTag, bool noLocal, bool noAck,
+ bool exclusive, bool nowait,
+ const qpid::framing::FieldTable& fields);
+ void cancel(u_int16_t channel, const std::string& consumerTag,
+ bool nowait);
+ void publish(u_int16_t channel, u_int16_t ticket,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
+ void get(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ bool noAck);
+ void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
+ void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
+ void recover(u_int16_t channel, bool requeue);
+ };
+
+ class TxHandlerImpl : public TxHandler{
+ Connection& connection;
+ public:
+ TxHandlerImpl(Connection& c) : connection(c) {}
+ void select(u_int16_t channel);
+ void commit(u_int16_t channel);
+ void rollback(u_int16_t channel);
+ };
+
+ class MessageHandlerImpl : public MessageHandler {
+ Connection& connection;
+ public:
+ MessageHandlerImpl(Connection& c) : connection(c) {}
+
+ void append( u_int16_t channel,
+ const std::string& reference,
+ const std::string& bytes );
+
+ void cancel( u_int16_t channel,
+ const std::string& destination );
+
+ void checkpoint( u_int16_t channel,
+ const std::string& reference,
+ const std::string& identifier );
+
+ void close( u_int16_t channel,
+ const std::string& reference );
+
+ void consume( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& destination,
+ bool noLocal,
+ bool noAck,
+ bool exclusive,
+ const qpid::framing::FieldTable& filter );
+
+ void empty( u_int16_t channel );
+
+ void get( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& destination,
+ bool noAck );
+
+ void offset( u_int16_t channel,
+ u_int64_t value );
+
+ void ok( u_int16_t channel );
+
+ void open( u_int16_t channel,
+ const std::string& reference );
+
+ void qos( u_int16_t channel,
+ u_int32_t prefetchSize,
+ u_int16_t prefetchCount,
+ bool global );
+
+ void recover( u_int16_t channel,
+ bool requeue );
+
+ void reject( u_int16_t channel,
+ u_int16_t code,
+ const std::string& text );
+
+ void resume( u_int16_t channel,
+ const std::string& reference,
+ const std::string& identifier );
+
+ void transfer( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& destination,
+ bool redelivered,
+ bool immediate,
+ u_int64_t ttl,
+ u_int8_t priority,
+ u_int64_t timestamp,
+ u_int8_t deliveryMode,
+ u_int64_t expiration,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const std::string& messageId,
+ const std::string& correlationId,
+ const std::string& replyTo,
+ const std::string& contentType,
+ const std::string& contentEncoding,
+ const std::string& userId,
+ const std::string& appId,
+ const std::string& transactionId,
+ const std::string& securityToken,
+ const qpid::framing::FieldTable& applicationHeaders,
+ qpid::framing::Content body );
+ };
+
+ Connection& connection;
+
+ BasicHandlerImpl basicHandler;
+ ChannelHandlerImpl channelHandler;
+ ConnectionHandlerImpl connectionHandler;
+ ExchangeHandlerImpl exchangeHandler;
+ MessageHandlerImpl messageHandler;
+ QueueHandlerImpl queueHandler;
+ TxHandlerImpl txHandler;
+};
+
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_BrokerAdapter_h*/
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Jan 16 19:49:35 2007
@@ -42,12 +42,15 @@
tagGenerator("sgen"),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
- version(_version){
+ version(_version),
+ isClosed(false)
+{
outstanding.reset();
}
Channel::~Channel(){
+ close();
}
bool Channel::exists(const string& consumerTag){
@@ -83,12 +86,13 @@
}
void Channel::close(){
- //cancel all consumers
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- cancel(i);
+ if (!isClosed) {
+ isClosed = true;
+ while (!consumers.empty())
+ cancel(consumers.begin());
+ //requeue:
+ recover(true);
}
- //requeue:
- recover(true);
}
void Channel::begin(){
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Tue Jan 16 19:49:35 2007
@@ -90,6 +90,7 @@
MessageBuilder messageBuilder;//builder for in-progress message
Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
qpid::framing::ProtocolVersion version; // version used for this channel
+ bool isClosed;
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Tue Jan 16 19:49:35 2007
@@ -21,11 +21,9 @@
#include <iostream>
#include <assert.h>
-#include "Connection.h"
-
-#include "FanOutExchange.h"
-#include "HeadersExchange.h"
+#include "Connection.h"
+// TODO aconway 2007-01-16: move to channel.
#include "Requester.h"
#include "Responder.h"
@@ -37,50 +35,24 @@
namespace qpid {
namespace broker {
-Connection::Connection(
- SessionContext* _context, Broker& broker) :
-
- context(_context),
- client(0),
- queues(broker.getQueues()),
- exchanges(broker.getExchanges()),
- cleaner(broker.getCleaner()),
- settings(broker.getTimeout(), broker.getStagingThreshold()),
+Connection::Connection(SessionContext* context_, Broker& broker_) :
+ adapter(*this),
requester(broker.getRequester()),
responder(broker.getResponder()),
- basicHandler(new BasicHandlerImpl(this)),
- channelHandler(new ChannelHandlerImpl(this)),
- connectionHandler(new ConnectionHandlerImpl(this)),
- exchangeHandler(new ExchangeHandlerImpl(this)),
- queueHandler(new QueueHandlerImpl(this)),
- txHandler(new TxHandlerImpl(this)),
- messageHandler(new MessageHandlerImpl(this)),
+ context(context_),
framemax(65536),
- heartbeat(0)
+ heartbeat(0),
+ broker(broker_),
+ settings(broker.getTimeout(), broker.getStagingThreshold())
{}
-Connection::~Connection(){
-
- if (client != NULL)
- delete client;
-
-}
-
-Channel* Connection::getChannel(u_int16_t channel){
- channel_iterator i = channels.find(channel);
- if(i == channels.end()){
- throw ConnectionException(504, "Unknown channel: " + channel);
- }
- return i->second;
-}
-
Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
Queue::shared_ptr queue;
if (name.empty()) {
- queue = getChannel(channel)->getDefaultQueue();
+ queue = getChannel(channel).getDefaultQueue();
if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
} else {
- queue = queues.find(name);
+ queue = broker.getQueues().find(name);
if (queue == 0) {
throw ChannelException( 404, "Queue not found: " + name);
}
@@ -90,7 +62,7 @@
Exchange::shared_ptr Connection::findExchange(const string& name){
- return exchanges.get(name);
+ return broker.getExchanges().get(name);
}
void Connection::handleMethod(
@@ -99,10 +71,9 @@
AMQMethodBody::shared_ptr method =
shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
try{
- method->invoke(*this, channel);
+ method->invoke(adapter, channel);
}catch(ChannelException& e){
- channels[channel]->close();
- channels.erase(channel);
+ closeChannel(channel);
client->getChannel().close(
channel, e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
@@ -172,529 +143,78 @@
out.send(frame);
}
-void Connection::initiated(qpid::framing::ProtocolInitiation* header){
-
- if (client == 0)
- {
- client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
-
-
- std::cout << "---------------" << this << std::endl;
-
- //send connection start
- FieldTable properties;
- string mechanisms("PLAIN");
- string locales("en_US"); // channel, majour, minor
- client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales);
- }
+void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
+ if (client.get())
+ // TODO aconway 2007-01-16: correct code.
+ throw ConnectionException(0, "Connection initiated twice");
+
+ client.reset(new qpid::framing::AMQP_ClientProxy(
+ context, header->getMajor(), header->getMinor()));
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US");
+ // TODO aconway 2007-01-16: Move to adapter.
+ client->getConnection().start(
+ 0, header->getMajor(), header->getMinor(), properties,
+ mechanisms, locales);
}
-void Connection::idleOut(){
-
-}
+void Connection::idleOut(){}
-void Connection::idleIn(){
-
-}
+void Connection::idleIn(){}
void Connection::closed(){
try {
- for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
- Channel* c = i->second;
- channels.erase(i);
- c->close();
- delete c;
- }
- for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
- string name = (*i)->getName();
- queues.destroy(name);
- exclusiveQueues.erase(i);
+ while (!exclusiveQueues.empty()) {
+ broker.getQueues().destroy(exclusiveQueues.front()->getName());
+ exclusiveQueues.erase(exclusiveQueues.begin());
}
} catch(std::exception& e) {
- std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
+ std::cout << "Caught unhandled exception while closing session: " <<
+ e.what() << std::endl;
+ assert(0);
}
}
+// TODO aconway 2007-01-16: colapse these.
void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
- getChannel(channel)->handleHeader(body);
+ getChannel(channel).handleHeader(body);
}
void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
- getChannel(channel)->handleContent(body);
+ getChannel(channel).handleContent(body);
}
void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
std::cout << "Connection::handleHeartbeat()" << std::endl;
}
-
-void Connection::ConnectionHandlerImpl::startOk(
- u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/){
- parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
-}
-
-void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
-
-void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
- parent->framemax = framemax;
- parent->heartbeat = heartbeat;
-}
-
-void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
- string knownhosts;
- parent->client->getConnection().openOk(0, knownhosts);
-}
-
-void Connection::ConnectionHandlerImpl::close(
- u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
- u_int16_t /*classId*/, u_int16_t /*methodId*/)
-{
- parent->client->getConnection().closeOk(0);
- parent->context->close();
-}
-
-void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
- parent->context->close();
-}
-
-
-
-void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
-
-
- parent->channels[channel] = new Channel(
- parent->client->getProtocolVersion() , parent->context, channel,
- parent->framemax, parent->queues.getStore(),
- parent->settings.stagingThreshold);
-
- // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
- parent->client->getChannel().openOk(channel, std::string()/* ID */);
-}
-
-void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
-void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
-
-void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
- u_int16_t /*classId*/, u_int16_t /*methodId*/){
- Channel* c = parent->getChannel(channel);
- if(c){
- parent->channels.erase(channel);
- c->close();
- delete c;
- parent->client->getChannel().closeOk(channel);
- }
-}
-
-void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
-
-
-
-void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
-
- if(passive){
- if(!parent->exchanges.get(exchange)){
- throw ChannelException(404, "Exchange not found: " + exchange);
- }
- }else{
- try{
- std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type);
- if(!response.second && response.first->getType() != type){
- throw ConnectionException(507, "Exchange already declared to be of type "
- + response.first->getType() + ", requested " + type);
- }
- }catch(UnknownExchangeTypeException& e){
- throw ConnectionException(503, "Exchange type not implemented: " + type);
- }
- }
- if(!nowait){
- parent->client->getExchange().declareOk(channel);
- }
-}
-
-
-void Connection::ExchangeHandlerImpl::unbind(
- u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const qpid::framing::FieldTable& /*arguments*/ )
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-
-
-void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
-
- //TODO: implement unused
- parent->exchanges.destroy(exchange);
- if(!nowait) parent->client->getExchange().deleteOk(channel);
-}
-
-void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
- Queue::shared_ptr queue;
- if (passive && !name.empty()) {
- queue = parent->getQueue(name, channel);
- } else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
- parent->getChannel(channel)->setDefaultQueue(queue);
-
- //apply settings & create persistent record if required
- queue_created.first->create(arguments);
-
- //add default binding:
- parent->exchanges.getDefault()->bind(queue, name, 0);
- if (exclusive) {
- parent->exclusiveQueues.push_back(queue);
- } else if(autoDelete){
- parent->cleaner.add(queue);
- }
- }
- }
- if (exclusive && !queue->isExclusiveOwner(parent)) {
- throw ChannelException(405, "Cannot grant exclusive access to queue");
- }
- if (!nowait) {
- string queueName = queue->getName();
- parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
- }
-}
-
-void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
- if(exchange){
- // kpvdr - cannot use this any longer as routingKey is now const
- // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
- // exchange->bind(queue, routingKey, &arguments);
- string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) parent->client->getQueue().bindOk(channel);
- }else{
- throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
- }
-}
-
-void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- int count = queue->purge();
- if(!nowait) parent->client->getQueue().purgeOk(channel, count);
-}
-
-void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
- ChannelException error(0, "");
- int count(0);
- Queue::shared_ptr q = parent->getQueue(queue, channel);
- if(ifEmpty && q->getMessageCount() > 0){
- throw ChannelException(406, "Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw ChannelException(406, "Queue in use.");
- }else{
- //remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(parent)){
- queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
- if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
- }
- count = q->getMessageCount();
- q->destroy();
- parent->queues.destroy(queue);
- }
-
- if(!nowait) parent->client->getQueue().deleteOk(channel, count);
-}
-
-
-
-
-void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
- //TODO: handle global
- parent->getChannel(channel)->setPrefetchSize(prefetchSize);
- parent->getChannel(channel)->setPrefetchCount(prefetchCount);
- parent->client->getBasic().qosOk(channel);
-}
-
-void Connection::BasicHandlerImpl::consume(
- u_int16_t channelId, u_int16_t /*ticket*/,
- const string& queueName, const string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait, const FieldTable& fields)
-{
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
- Channel* channel = parent->channels[channelId];
- if(!consumerTag.empty() && channel->exists(consumerTag)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = consumerTag;
- channel->consume(
- newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields);
-
- if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-
-}
-
-void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
- parent->getChannel(channel)->cancel(consumerTag);
-
- if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag);
-}
-
-void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate){
-
- Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName);
- if(exchange){
- Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
- parent->getChannel(channel)->handlePublish(msg, exchange);
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
- }
-}
-
-void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
- if(!parent->getChannel(channelId)->get(queue, !noAck)){
- string clusterId;//not used, part of an imatix hack
-
- parent->client->getBasic().getEmpty(channelId, clusterId);
- }
-}
-
-void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
- try{
- parent->getChannel(channel)->ack(deliveryTag, multiple);
- }catch(InvalidAckException& e){
- throw ConnectionException(530, "Received ack for unrecognised delivery tag");
- }
-}
-
-void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-
-void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
- parent->getChannel(channel)->recover(requeue);
-}
-
-void Connection::TxHandlerImpl::select(u_int16_t channel){
- parent->getChannel(channel)->begin();
- parent->client->getTx().selectOk(channel);
-}
-
-void Connection::TxHandlerImpl::commit(u_int16_t channel){
- parent->getChannel(channel)->commit();
- parent->client->getTx().commitOk(channel);
-}
-
-void Connection::TxHandlerImpl::rollback(u_int16_t channel){
-
- parent->getChannel(channel)->rollback();
- parent->client->getTx().rollbackOk(channel);
- parent->getChannel(channel)->recover(false);
-}
-
-void
-Connection::QueueHandlerImpl::unbind(
- u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const qpid::framing::FieldTable& /*arguments*/ )
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-void
-Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-void
-Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-void
-Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-void
-Connection::ChannelHandlerImpl::resume(
- u_int16_t /*channel*/,
- const string& /*channelId*/ )
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-// Message class method handlers
-void
-Connection::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-
-void
-Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
- const string& /*destination*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-void
-Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+void Connection::openChannel(u_int16_t channel) {
+ if (channel == 0)
+ throw ConnectionException(504, "Illegal channel 0");
+ if (channels.find(channel) != channels.end())
+ throw ConnectionException(504, "Channel already open: " + channel);
+ channels.insert(
+ channel,
+ new Channel(
+ client->getProtocolVersion(), context, channel, framemax,
+ broker.getQueues().getStore(), settings.stagingThreshold));
+}
+
+void Connection::closeChannel(u_int16_t channel) {
+ getChannel(channel).close(); // throws if channel does not exist.
+ channels.erase(channels.find(channel));
}
-void
-Connection::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noLocal*/,
- bool /*noAck*/,
- bool /*exclusive*/,
- const qpid::framing::FieldTable& /*filter*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-void
-Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::get( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noAck*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/,
- u_int32_t /*prefetchSize*/,
- u_int16_t /*prefetchCount*/,
- bool /*global*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/,
- bool /*requeue*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+Channel& Connection::getChannel(u_int16_t channel){
+ ChannelMap::iterator i = channels.find(channel);
+ if(i == channels.end())
+ throw ConnectionException(504, "Unknown channel: " + channel);
+ return *i;
}
-void
-Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-void
-Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool /*immediate*/,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h Tue Jan 16 19:49:35 2007
@@ -21,10 +21,11 @@
#ifndef _Connection_
#define _Connection_
-#include <map>
#include <sstream>
#include <vector>
+#include <boost/ptr_container/ptr_map.hpp>
+
#include <AMQFrame.h>
#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
@@ -32,6 +33,7 @@
#include <sys/ConnectionInputHandler.h>
#include <sys/TimeoutHandler.h>
#include "Broker.h"
+#include "BrokerAdapter.h"
#include "Exception.h"
namespace qpid {
@@ -46,11 +48,13 @@
};
class Connection : public qpid::sys::ConnectionInputHandler,
- public qpid::framing::AMQP_ServerOperations,
- public ConnectionToken
+ public ConnectionToken
{
- typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+ typedef boost::ptr_map<u_int16_t, Channel> ChannelMap;
+
+ // TODO aconway 2007-01-16: belongs on broker.
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
class Sender : public qpid::framing::OutputHandler {
public:
Sender(qpid::framing::OutputHandler&,
@@ -61,35 +65,44 @@
qpid::framing::Requester& requester;
qpid::framing::Responder& responder;
};
-
- qpid::sys::SessionContext* context;
- qpid::framing::AMQP_ClientProxy* client;
- QueueRegistry& queues;
- ExchangeRegistry& exchanges;
- AutoDelete& cleaner;
- Settings settings;
+
+ BrokerAdapter adapter;
+ // FIXME aconway 2007-01-16: On Channel
qpid::framing::Requester& requester;
qpid::framing::Responder& responder;
- std::auto_ptr<BasicHandler> basicHandler;
- std::auto_ptr<ChannelHandler> channelHandler;
- std::auto_ptr<ConnectionHandler> connectionHandler;
- std::auto_ptr<ExchangeHandler> exchangeHandler;
- std::auto_ptr<QueueHandler> queueHandler;
- std::auto_ptr<TxHandler> txHandler;
- std::auto_ptr<MessageHandler> messageHandler;
+ ChannelMap channels;
- std::map<u_int16_t, Channel*> channels;
- std::vector<Queue::shared_ptr> exclusiveQueues;
+ void handleHeader(u_int16_t channel,
+ qpid::framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(u_int16_t channel,
+ qpid::framing::AMQContentBody::shared_ptr body);
+ void handleMethod(u_int16_t channel,
+ qpid::framing::AMQBody::shared_ptr body);
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+ // FIXME aconway 2007-01-16: on broker.
+ Exchange::shared_ptr findExchange(const string& name);
+
+ public:
+ Connection(qpid::sys::SessionContext* context, Broker& broker);
+ // ConnectionInputHandler methods
+ void received(qpid::framing::AMQFrame* frame);
+ void initiated(qpid::framing::ProtocolInitiation* header);
+ void idleOut();
+ void idleIn();
+ void closed();
+
+ // FIXME aconway 2007-01-16: encapsulate.
+ qpid::sys::SessionContext* context;
u_int32_t framemax;
u_int16_t heartbeat;
+ Broker& broker;
+ std::auto_ptr<qpid::framing::AMQP_ClientProxy> client;
+ Settings settings;
+ // FIXME aconway 2007-01-16: Belongs on broker?
+ std::vector<Queue::shared_ptr> exclusiveQueues;
- void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
- void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
- void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr body);
- void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
-
- Channel* getChannel(u_int16_t channel);
+ // FIXME aconway 2007-01-16: move to broker.
/**
* Get named queue, never returns 0.
* @return: named queue or default queue for channel if name=""
@@ -98,261 +111,10 @@
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
- Exchange::shared_ptr findExchange(const string& name);
-
- public:
- Connection(qpid::sys::SessionContext* context, Broker& broker);
- virtual void received(qpid::framing::AMQFrame* frame);
- virtual void initiated(qpid::framing::ProtocolInitiation* header);
- virtual void idleOut();
- virtual void idleIn();
- virtual void closed();
- virtual ~Connection();
-
- class ConnectionHandlerImpl : public ConnectionHandler{
- Connection* parent;
- public:
- inline ConnectionHandlerImpl(Connection* _parent) : parent(_parent) {}
-
- virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism,
- const string& response, const string& locale);
-
- virtual void secureOk(u_int16_t channel, const string& response);
-
- virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
-
- virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist);
-
- virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId,
- u_int16_t methodId);
-
- virtual void closeOk(u_int16_t channel);
-
- virtual ~ConnectionHandlerImpl(){}
- };
-
- class ChannelHandlerImpl : public ChannelHandler{
- Connection* parent;
- public:
- inline ChannelHandlerImpl(Connection* _parent) : parent(_parent) {}
-
- virtual void open(u_int16_t channel, const string& outOfBand);
-
- virtual void flow(u_int16_t channel, bool active);
-
- virtual void flowOk(u_int16_t channel, bool active);
-
- virtual void ok( u_int16_t channel );
-
- virtual void ping( u_int16_t channel );
-
- virtual void pong( u_int16_t channel );
-
- virtual void resume( u_int16_t channel,
- const string& channelId );
-
- virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText,
- u_int16_t classId, u_int16_t methodId);
-
- virtual void closeOk(u_int16_t channel);
-
- virtual ~ChannelHandlerImpl(){}
- };
-
- class ExchangeHandlerImpl : public ExchangeHandler{
- Connection* parent;
- public:
- inline ExchangeHandlerImpl(Connection* _parent) : parent(_parent) {}
-
- virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type,
- bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
- const qpid::framing::FieldTable& arguments);
-
- virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait);
-
- virtual void unbind(u_int16_t channel,
- u_int16_t ticket,
- const string& queue,
- const string& exchange,
- const string& routingKey,
- const qpid::framing::FieldTable& arguments );
-
- virtual ~ExchangeHandlerImpl(){}
- };
-
-
- class QueueHandlerImpl : public QueueHandler{
- Connection* parent;
- public:
- inline QueueHandlerImpl(Connection* _parent) : parent(_parent) {}
-
- virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments);
-
- virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue,
- const string& exchange, const string& routingKey, bool nowait,
- const qpid::framing::FieldTable& arguments);
-
- virtual void unbind(u_int16_t channel,
- u_int16_t ticket,
- const string& queue,
- const string& exchange,
- const string& routingKey,
- const qpid::framing::FieldTable& arguments );
-
- virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue,
- bool nowait);
-
- virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty,
- bool nowait);
-
- virtual ~QueueHandlerImpl(){}
- };
-
- class BasicHandlerImpl : public BasicHandler{
- Connection* parent;
- public:
- inline BasicHandlerImpl(Connection* _parent) : parent(_parent) {}
-
- virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
-
- virtual void consume(
- u_int16_t channel, u_int16_t ticket, const string& queue,
- const string& consumerTag, bool noLocal, bool noAck,
- bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
-
- virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait);
-
- virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey,
- bool mandatory, bool immediate);
-
- virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck);
-
- virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
-
- virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
-
- virtual void recover(u_int16_t channel, bool requeue);
-
- virtual ~BasicHandlerImpl(){}
- };
-
- class TxHandlerImpl : public TxHandler{
- Connection* parent;
- public:
- TxHandlerImpl(Connection* _parent) : parent(_parent) {}
- virtual ~TxHandlerImpl() {}
- virtual void select(u_int16_t channel);
- virtual void commit(u_int16_t channel);
- virtual void rollback(u_int16_t channel);
- };
-
- class MessageHandlerImpl : public MessageHandler {
- Connection* parent;
-
- // Constructors and destructors
-
- public:
- MessageHandlerImpl() {}
- MessageHandlerImpl(Connection* _parent) : parent(_parent) {}
- virtual ~MessageHandlerImpl() {}
-
- // Protocol methods
- virtual void append( u_int16_t channel,
- const string& reference,
- const string& bytes );
-
- virtual void cancel( u_int16_t channel,
- const string& destination );
-
- virtual void checkpoint( u_int16_t channel,
- const string& reference,
- const string& identifier );
-
- virtual void close( u_int16_t channel,
- const string& reference );
-
- virtual void consume( u_int16_t channel,
- u_int16_t ticket,
- const string& queue,
- const string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const qpid::framing::FieldTable& filter );
-
- virtual void empty( u_int16_t channel );
-
- virtual void get( u_int16_t channel,
- u_int16_t ticket,
- const string& queue,
- const string& destination,
- bool noAck );
-
- virtual void offset( u_int16_t channel,
- u_int64_t value );
-
- virtual void ok( u_int16_t channel );
-
- virtual void open( u_int16_t channel,
- const string& reference );
-
- virtual void qos( u_int16_t channel,
- u_int32_t prefetchSize,
- u_int16_t prefetchCount,
- bool global );
-
- virtual void recover( u_int16_t channel,
- bool requeue );
-
- virtual void reject( u_int16_t channel,
- u_int16_t code,
- const string& text );
-
- virtual void resume( u_int16_t channel,
- const string& reference,
- const string& identifier );
-
- virtual void transfer( u_int16_t channel,
- u_int16_t ticket,
- const string& destination,
- bool redelivered,
- bool immediate,
- u_int64_t ttl,
- u_int8_t priority,
- u_int64_t timestamp,
- u_int8_t deliveryMode,
- u_int64_t expiration,
- const string& exchange,
- const string& routingKey,
- const string& messageId,
- const string& correlationId,
- const string& replyTo,
- const string& contentType,
- const string& contentEncoding,
- const string& userId,
- const string& appId,
- const string& transactionId,
- const string& securityToken,
- const qpid::framing::FieldTable& applicationHeaders,
- qpid::framing::Content body );
- };
- virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
- virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
- virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
- virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
- virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
- virtual TxHandler* getTxHandler(){ return txHandler.get(); }
- virtual MessageHandler* getMessageHandler(){ return messageHandler.get(); }
-
- virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }
- virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }
- virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }
- virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }
- virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
+ void openChannel(u_int16_t channel);
+ void closeChannel(u_int16_t channel);
+ Channel& getChannel(u_int16_t channel);
};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am Tue Jan 16 19:49:35 2007
@@ -65,10 +65,12 @@
QueueRegistry.h \
RecoveryManager.cpp \
RecoveryManager.h \
- ConnectionFactory.cpp \
- ConnectionFactory.h \
- Connection.cpp \
- Connection.h \
+ ConnectionFactory.cpp \
+ ConnectionFactory.h \
+ Connection.cpp \
+ Connection.h \
+ BrokerAdapter.cpp \
+ BrokerAdapter.h \
TopicExchange.cpp \
TopicExchange.h \
TransactionalStore.h \
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h?view=diff&rev=496926&r1=496925&r2=496926
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h Tue Jan 16 19:49:35 2007
@@ -36,7 +36,7 @@
namespace framing {
using std::string;
-
+typedef u_int16_t ChannelId;
typedef u_int64_t RequestId;
typedef u_int64_t ResponseId;
typedef u_int32_t BatchOffset;