You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/18 07:27:52 UTC
svn commit: r497319 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/
lib/client/ lib/common/framing/ tests/
Author: aconway
Date: Wed Jan 17 22:27:50 2007
New Revision: 497319
URL: http://svn.apache.org/viewvc?view=rev&rev=497319
Log:
There are a ton of FIXMES and request/response IDs are not yet working fully
but all tests are passing.
* broker::Broker: Removed requester/responder from broker.
* framing::BodyHandler: added Requester/Responder to BodyHandler, becomes
the base class for channel adapters in broker and client.
* broker::BrokerAdapter: Inherit BodyHandler, wraps a broker::Channel.
Hide private *HandlerImpl detail classes in BodyHandler.cpp.
* broker::Connection: Requester/Responder/Adapter now per-channel.
Connection channel map replaced with adapter map of BrokerAdapters.
handle* functions moved to BrokerAdapter.
All methods now handled by a BrokerAdapter for the relevant channel.
ChannelHandlerImpl is repsonsible for checking that
- No method on a non-0 channel is processed before open()
- Channel 0 methods only happen on channel 0 and similar for non-zero methods
Checks are not yet complete (see FIXMES)
* client::ResponseHandler: fix for client hang if broker crashs.
Removed:
incubator/qpid/branches/qpid.0-9/cpp/tests/BodyHandlerTest.cpp
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h
incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h Wed Jan 17 22:27:50 2007
@@ -29,8 +29,6 @@
#include <SharedObject.h>
#include <MessageStore.h>
#include <AutoDelete.h>
-#include "Requester.h"
-#include "Responder.h"
#include <ExchangeRegistry.h>
#include <BrokerChannel.h>
#include <ConnectionToken.h>
@@ -86,8 +84,6 @@
u_int32_t getTimeout() { return timeout; }
u_int64_t getStagingThreshold() { return stagingThreshold; }
AutoDelete& getCleaner() { return cleaner; }
- qpid::framing::Requester& getRequester() { return requester; }
- qpid::framing::Responder& getResponder() { return responder; }
private:
Broker(const Configuration& config);
@@ -100,8 +96,6 @@
u_int64_t stagingThreshold;
AutoDelete cleaner;
ConnectionFactory factory;
- qpid::framing::Requester requester;
- qpid::framing::Responder responder;
};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Wed Jan 17 22:27:50 2007
@@ -15,10 +15,11 @@
* limitations under the License.
*
*/
-
#include "BrokerAdapter.h"
#include "Connection.h"
#include "Exception.h"
+#include "AMQMethodBody.h"
+#include "Exception.h"
namespace qpid {
namespace broker {
@@ -28,75 +29,263 @@
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-BrokerAdapter::BrokerAdapter(Connection& c) :
- connection(c),
- basicHandler(c),
- channelHandler(c),
- connectionHandler(c),
- exchangeHandler(c),
- messageHandler(c),
- queueHandler(c),
- txHandler(c)
-{}
-
-typedef qpid::framing::AMQP_ServerOperations Ops;
+class BrokerAdapter::ServerOps : public AMQP_ServerOperations
+{
+ public:
+ ServerOps(Channel& ch, Connection& c, Broker& b) :
+ basicHandler(ch, c, b),
+ channelHandler(ch, c, b),
+ connectionHandler(ch, c, b),
+ exchangeHandler(ch, c, b),
+ messageHandler(ch, c, b),
+ queueHandler(ch, c, b),
+ txHandler(ch, c, b)
+ {}
+
+ ChannelHandler* getChannelHandler() { return &channelHandler; }
+ ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
+ BasicHandler* getBasicHandler() { return &basicHandler; }
+ ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+ QueueHandler* getQueueHandler() { return &queueHandler; }
+ TxHandler* getTxHandler() { return &txHandler; }
+ MessageHandler* getMessageHandler() { return &messageHandler; }
+ AccessHandler* getAccessHandler() {
+ throw ConnectionException(540, "Access class not implemented"); }
+ FileHandler* getFileHandler() {
+ throw ConnectionException(540, "File class not implemented"); }
+ StreamHandler* getStreamHandler() {
+ throw ConnectionException(540, "Stream class not implemented"); }
+ DtxHandler* getDtxHandler() {
+ throw ConnectionException(540, "Dtx class not implemented"); }
+ TunnelHandler* getTunnelHandler() {
+ throw ConnectionException(540, "Tunnel class not implemented"); }
+
+ private:
+ struct CoreRefs {
+ CoreRefs(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b) {}
+
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+ };
+
+ class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
+ public:
+ ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+
+ void startOk(u_int16_t channel,
+ const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(u_int16_t channel, const std::string& response);
+ void tuneOk(u_int16_t channel, u_int16_t channelMax,
+ u_int32_t frameMax, u_int16_t heartbeat);
+ void open(u_int16_t channel, const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(u_int16_t channel, u_int16_t replyCode,
+ const std::string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+ void closeOk(u_int16_t channel);
+ };
+
+ class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
+ public:
+ ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void open(u_int16_t channel, const std::string& outOfBand);
+ void flow(u_int16_t channel, bool active);
+ void flowOk(u_int16_t channel, bool active);
+ void ok( u_int16_t channel );
+ void ping( u_int16_t channel );
+ void pong( u_int16_t channel );
+ void resume( u_int16_t channel, const std::string& channelId );
+ void close(u_int16_t channel, u_int16_t replyCode, const
+ std::string& replyText, u_int16_t classId, u_int16_t methodId);
+ void closeOk(u_int16_t channel);
+ };
+
+ class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
+ public:
+ ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void declare(u_int16_t channel, u_int16_t ticket,
+ const std::string& exchange, const std::string& type,
+ bool passive, bool durable, bool autoDelete,
+ bool internal, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(u_int16_t channel, u_int16_t ticket,
+ const std::string& exchange, bool ifUnused, bool nowait);
+ void unbind(u_int16_t channel,
+ u_int16_t ticket, const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments );
+ };
+
+ class QueueHandlerImpl : private CoreRefs, public QueueHandler{
+ public:
+ QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ bool nowait, const qpid::framing::FieldTable& arguments);
+ void unbind(u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments );
+ void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ bool nowait);
+ void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ bool ifUnused, bool ifEmpty,
+ bool nowait);
+ };
+
+ class BasicHandlerImpl : private CoreRefs, public BasicHandler{
+ public:
+ BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void qos(u_int16_t channel, u_int32_t prefetchSize,
+ u_int16_t prefetchCount, bool global);
+ void consume(
+ u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ const std::string& consumerTag, bool noLocal, bool noAck,
+ bool exclusive, bool nowait,
+ const qpid::framing::FieldTable& fields);
+ void cancel(u_int16_t channel, const std::string& consumerTag,
+ bool nowait);
+ void publish(u_int16_t channel, u_int16_t ticket,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
+ void get(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ bool noAck);
+ void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
+ void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
+ void recover(u_int16_t channel, bool requeue);
+ };
+
+ class TxHandlerImpl : private CoreRefs, public TxHandler{
+ public:
+ TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void select(u_int16_t channel);
+ void commit(u_int16_t channel);
+ void rollback(u_int16_t channel);
+ };
+
+ class MessageHandlerImpl : private CoreRefs, public MessageHandler {
+ public:
+ MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+
+ void append( u_int16_t channel,
+ const std::string& reference,
+ const std::string& bytes );
+
+ void cancel( u_int16_t channel,
+ const std::string& destination );
+
+ void checkpoint( u_int16_t channel,
+ const std::string& reference,
+ const std::string& identifier );
+
+ void close( u_int16_t channel,
+ const std::string& reference );
+
+ void consume( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& destination,
+ bool noLocal,
+ bool noAck,
+ bool exclusive,
+ const qpid::framing::FieldTable& filter );
+
+ void empty( u_int16_t channel );
+
+ void get( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& destination,
+ bool noAck );
+
+ void offset( u_int16_t channel,
+ u_int64_t value );
+
+ void ok( u_int16_t channel );
+
+ void open( u_int16_t channel,
+ const std::string& reference );
+
+ void qos( u_int16_t channel,
+ u_int32_t prefetchSize,
+ u_int16_t prefetchCount,
+ bool global );
+
+ void recover( u_int16_t channel,
+ bool requeue );
+
+ void reject( u_int16_t channel,
+ u_int16_t code,
+ const std::string& text );
+
+ void resume( u_int16_t channel,
+ const std::string& reference,
+ const std::string& identifier );
+
+ void transfer( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& destination,
+ bool redelivered,
+ bool immediate,
+ u_int64_t ttl,
+ u_int8_t priority,
+ u_int64_t timestamp,
+ u_int8_t deliveryMode,
+ u_int64_t expiration,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const std::string& messageId,
+ const std::string& correlationId,
+ const std::string& replyTo,
+ const std::string& contentType,
+ const std::string& contentEncoding,
+ const std::string& userId,
+ const std::string& appId,
+ const std::string& transactionId,
+ const std::string& securityToken,
+ const qpid::framing::FieldTable& applicationHeaders,
+ qpid::framing::Content body );
+ };
+
+ BasicHandlerImpl basicHandler;
+ ChannelHandlerImpl channelHandler;
+ ConnectionHandlerImpl connectionHandler;
+ ExchangeHandlerImpl exchangeHandler;
+ MessageHandlerImpl messageHandler;
+ QueueHandlerImpl queueHandler;
+ TxHandlerImpl txHandler;
-Ops::ChannelHandler* BrokerAdapter::getChannelHandler() {
- return &channelHandler;
-}
-Ops::ConnectionHandler* BrokerAdapter::getConnectionHandler() {
- return &connectionHandler;
-}
-Ops::BasicHandler* BrokerAdapter::getBasicHandler() {
- return &basicHandler;
-}
-Ops::ExchangeHandler* BrokerAdapter::getExchangeHandler() {
- return &exchangeHandler;
-}
-Ops::QueueHandler* BrokerAdapter::getQueueHandler() {
- return &queueHandler;
-}
-Ops::TxHandler* BrokerAdapter::getTxHandler() {
- return &txHandler;
-}
-Ops::MessageHandler* BrokerAdapter::getMessageHandler() {
- return &messageHandler;
-}
-Ops::AccessHandler* BrokerAdapter::getAccessHandler() {
- throw ConnectionException(540, "Access class not implemented");
-}
-Ops::FileHandler* BrokerAdapter::getFileHandler() {
- throw ConnectionException(540, "File class not implemented");
-}
-Ops::StreamHandler* BrokerAdapter::getStreamHandler() {
- throw ConnectionException(540, "Stream class not implemented");
-}
-Ops::DtxHandler* BrokerAdapter::getDtxHandler() {
- throw ConnectionException(540, "Dtx class not implemented");
-}
-Ops::TunnelHandler* BrokerAdapter::getTunnelHandler() {
- throw ConnectionException(540, "Tunnel class not implemented");
-}
+};
-void BrokerAdapter::ConnectionHandlerImpl::startOk(
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat);
}
-void BrokerAdapter::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
-void BrokerAdapter::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
connection.framemax = framemax;
connection.heartbeat = heartbeat;
}
-void BrokerAdapter::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
connection.client->getConnection().openOk(0, knownhosts);
}
-void BrokerAdapter::ConnectionHandlerImpl::close(
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
@@ -104,47 +293,55 @@
connection.context->close();
}
-void BrokerAdapter::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
connection.context->close();
}
-void BrokerAdapter::ChannelHandlerImpl::open(
- u_int16_t channel, const string& /*outOfBand*/){
- connection.openChannel(channel);
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
+ u_int16_t channelId, const string& /*outOfBand*/){
+ // FIXME aconway 2007-01-17: Assertions on all channel methods,
+ // Drop channelId param.
+ assertChannelNonZero(channel.getId());
+ if (channel.isOpen())
+ throw ConnectionException(504, "Channel already open");
+ channel.open();
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
- connection.client->getChannel().openOk(channel, std::string()/* ID */);
+ connection.client->getChannel().openOk(channelId, std::string()/* ID */);
}
-void BrokerAdapter::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
-void BrokerAdapter::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
-void BrokerAdapter::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
- u_int16_t /*classId*/, u_int16_t /*methodId*/){
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/){
connection.closeChannel(channel);
connection.client->getChannel().closeOk(channel);
}
-void BrokerAdapter::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
-void BrokerAdapter::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
if(passive){
- if(!connection.broker.getExchanges().get(exchange)){
- throw ChannelException(404, "Exchange not found: " + exchange);
+ if(!broker.getExchanges().get(exchange)) {
+ throw ChannelException(404, "Exchange not found: " + exchange);
}
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = connection.broker.getExchanges().declare(exchange, type);
+ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
if(!response.second && response.first->getType() != type){
- throw ConnectionException(507, "Exchange already declared to be of type "
- + response.first->getType() + ", requested " + type);
+ throw ConnectionException(
+ 507,
+ "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
}
}catch(UnknownExchangeTypeException& e){
- throw ConnectionException(503, "Exchange type not implemented: " + type);
+ throw ConnectionException(
+ 503, "Exchange type not implemented: " + type);
}
}
if(!nowait){
@@ -153,7 +350,7 @@
}
-void BrokerAdapter::ExchangeHandlerImpl::unbind(
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind(
u_int16_t /*channel*/,
u_int16_t /*ticket*/,
const string& /*queue*/,
@@ -166,23 +363,23 @@
-void BrokerAdapter::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- connection.broker.getExchanges().destroy(exchange);
+ broker.getExchanges().destroy(exchange);
if(!nowait) connection.client->getExchange().deleteOk(channel);
}
-void BrokerAdapter::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = connection.getQueue(name, channel);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- connection.broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
+ broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -192,11 +389,11 @@
queue_created.first->create(arguments);
//add default binding:
- connection.broker.getExchanges().getDefault()->bind(queue, name, 0);
+ broker.getExchanges().getDefault()->bind(queue, name, 0);
if (exclusive) {
connection.exclusiveQueues.push_back(queue);
} else if(autoDelete){
- connection.broker.getCleaner().add(queue);
+ broker.getCleaner().add(queue);
}
}
}
@@ -209,12 +406,12 @@
}
}
-void BrokerAdapter::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = connection.getQueue(queueName, channel);
- Exchange::shared_ptr exchange = connection.broker.getExchanges().get(exchangeName);
+ Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
// kpvdr - cannot use this any longer as routingKey is now const
// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
@@ -223,19 +420,20 @@
exchange->bind(queue, exchangeRoutingKey, &arguments);
if(!nowait) connection.client->getQueue().bindOk(channel);
}else{
- throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+ throw ChannelException(
+ 404, "Bind failed. No such exchange: " + exchangeName);
}
}
-void BrokerAdapter::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel);
int count = queue->purge();
if(!nowait) connection.client->getQueue().purgeOk(channel, count);
}
-void BrokerAdapter::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
Queue::shared_ptr q = connection.getQueue(queue, channel);
@@ -251,7 +449,7 @@
}
count = q->getMessageCount();
q->destroy();
- connection.broker.getQueues().destroy(queue);
+ broker.getQueues().destroy(queue);
}
if(!nowait) connection.client->getQueue().deleteOk(channel, count);
@@ -260,14 +458,14 @@
-void BrokerAdapter::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
connection.getChannel(channel).setPrefetchSize(prefetchSize);
connection.getChannel(channel).setPrefetchCount(prefetchCount);
connection.client->getBasic().qosOk(channel);
}
-void BrokerAdapter::BasicHandlerImpl::consume(
+void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
u_int16_t channelId, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -296,26 +494,27 @@
}
-void BrokerAdapter::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
connection.getChannel(channel).cancel(consumerTag);
if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag);
}
-void BrokerAdapter::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate){
- Exchange::shared_ptr exchange = exchangeName.empty() ? connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName);
+ Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate);
connection.getChannel(channel).handlePublish(msg, exchange);
}else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ throw ChannelException(
+ 404, "Exchange not found '" + exchangeName + "'");
}
}
-void BrokerAdapter::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
if(!connection.getChannel(channelId).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
@@ -324,7 +523,7 @@
}
}
-void BrokerAdapter::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
try{
connection.getChannel(channel).ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -332,23 +531,23 @@
}
}
-void BrokerAdapter::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
connection.getChannel(channel).recover(requeue);
}
-void BrokerAdapter::TxHandlerImpl::select(u_int16_t channel){
+void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t channel){
connection.getChannel(channel).begin();
connection.client->getTx().selectOk(channel);
}
-void BrokerAdapter::TxHandlerImpl::commit(u_int16_t channel){
+void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t channel){
connection.getChannel(channel).commit();
connection.client->getTx().commitOk(channel);
}
-void BrokerAdapter::TxHandlerImpl::rollback(u_int16_t channel){
+void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t channel){
connection.getChannel(channel).rollback();
connection.client->getTx().rollbackOk(channel);
@@ -356,7 +555,7 @@
}
void
-BrokerAdapter::QueueHandlerImpl::unbind(
+BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
u_int16_t /*channel*/,
u_int16_t /*ticket*/,
const string& /*queue*/,
@@ -368,25 +567,25 @@
}
void
-BrokerAdapter::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
-BrokerAdapter::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
-BrokerAdapter::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
-BrokerAdapter::ChannelHandlerImpl::resume(
+BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
u_int16_t /*channel*/,
const string& /*channelId*/ )
{
@@ -395,143 +594,191 @@
// Message class method handlers
void
-BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::append( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*bytes*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
- const string& /*destination*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
+ const string& /*destination*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::close( u_int16_t /*channel*/,
+ const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::consume( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noLocal*/,
- bool /*noAck*/,
- bool /*exclusive*/,
- const qpid::framing::FieldTable& /*filter*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::consume( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noLocal*/,
+ bool /*noAck*/,
+ bool /*exclusive*/,
+ const qpid::framing::FieldTable& /*filter*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::get( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noAck*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::get( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noAck*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::offset( u_int16_t /*channel*/,
+ u_int64_t /*value*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::open( u_int16_t /*channel*/,
+ const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::qos( u_int16_t /*channel*/,
- u_int32_t /*prefetchSize*/,
- u_int16_t /*prefetchCount*/,
- bool /*global*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::qos( u_int16_t /*channel*/,
+ u_int32_t /*prefetchSize*/,
+ u_int16_t /*prefetchCount*/,
+ bool /*global*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::recover( u_int16_t /*channel*/,
- bool /*requeue*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::recover( u_int16_t /*channel*/,
+ bool /*requeue*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::reject( u_int16_t /*channel*/,
+ u_int16_t /*code*/,
+ const string& /*text*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::resume( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool /*immediate*/,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
+BrokerAdapter::ServerOps::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*destination*/,
+ bool /*redelivered*/,
+ bool /*immediate*/,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content /*body*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
+BrokerAdapter::BrokerAdapter(
+ Channel* ch, Connection& c, Broker& b
+) :
+ channel(ch),
+ connection(c),
+ broker(b),
+ serverOps(new ServerOps(*ch,c,b))
+{
+ assert(ch);
+}
+
+void BrokerAdapter::handleMethod(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+{
+ try{
+ // FIXME aconway 2007-01-17: invoke to take Channel&?
+ method->invoke(*serverOps, channel->getId());
+ }catch(ChannelException& e){
+ connection.closeChannel(channel->getId());
+ connection.client->getChannel().close(
+ channel->getId(), e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(ConnectionException& e){
+ connection.client->getConnection().close(
+ 0, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.client->getConnection().close(
+ 0, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
+ channel->handleHeader(body);
+}
+
+void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) {
+ channel->handleContent(body);
+}
+
+void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+ // TODO aconway 2007-01-17: Implement heartbeats.
+}
+
+
+
}} // namespace qpid::broker
+
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Wed Jan 17 22:27:50 2007
@@ -20,237 +20,45 @@
*/
#include "AMQP_ServerOperations.h"
+#include "BodyHandler.h"
+#include "BrokerChannel.h"
namespace qpid {
namespace broker {
+class AMQMethodBody;
class Connection;
+class Broker;
+
+// FIXME aconway 2007-01-17: Rename to ChannelAdapter.
/**
- * Protocol adapter class for the broker.
+ * Per-channel protocol adapter.
+ *
+ * Translates protocol bodies into calls on the core Channel,
+ * Connection and Broker objects.
+ *
+ * Owns a channel, has references to Connection and Broker.
*/
-class BrokerAdapter : public qpid::framing::AMQP_ServerOperations
+class BrokerAdapter : public qpid::framing::BodyHandler
{
public:
- BrokerAdapter(Connection& connection);
- AccessHandler* getAccessHandler();
- BasicHandler* getBasicHandler();
- ChannelHandler* getChannelHandler();
- ConnectionHandler* getConnectionHandler();
- DtxHandler* getDtxHandler();
- ExchangeHandler* getExchangeHandler();
- FileHandler* getFileHandler();
- MessageHandler* getMessageHandler();
- QueueHandler* getQueueHandler();
- StreamHandler* getStreamHandler();
- TunnelHandler* getTunnelHandler();
- TxHandler* getTxHandler();
+ // FIXME aconway 2007-01-18: takes ownership, should pass auto_ptr<Channel>
+ BrokerAdapter(Channel* ch, Connection&, Broker&);
+ Channel& getChannel() { return *channel; }
+
+ void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
+ void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
private:
+ class ServerOps;
- class ConnectionHandlerImpl : public ConnectionHandler{
- Connection& connection;
- public:
- ConnectionHandlerImpl(Connection& c) : connection(c) {}
-
- void startOk(u_int16_t channel,
- const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(u_int16_t channel, const std::string& response);
- void tuneOk(u_int16_t channel, u_int16_t channelMax,
- u_int32_t frameMax, u_int16_t heartbeat);
- void open(u_int16_t channel, const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(u_int16_t channel, u_int16_t replyCode,
- const std::string& replyText,
- u_int16_t classId, u_int16_t methodId);
- void closeOk(u_int16_t channel);
- };
-
- class ChannelHandlerImpl : public ChannelHandler{
- Connection& connection;
- public:
- ChannelHandlerImpl(Connection& c) : connection(c) {}
- void open(u_int16_t channel, const std::string& outOfBand);
- void flow(u_int16_t channel, bool active);
- void flowOk(u_int16_t channel, bool active);
- void ok( u_int16_t channel );
- void ping( u_int16_t channel );
- void pong( u_int16_t channel );
- void resume( u_int16_t channel, const std::string& channelId );
- void close(u_int16_t channel, u_int16_t replyCode, const
- std::string& replyText, u_int16_t classId, u_int16_t methodId);
- void closeOk(u_int16_t channel);
- };
-
- class ExchangeHandlerImpl : public ExchangeHandler{
- Connection& connection;
- public:
- ExchangeHandlerImpl(Connection& c) : connection(c) {}
- void declare(u_int16_t channel, u_int16_t ticket,
- const std::string& exchange, const std::string& type,
- bool passive, bool durable, bool autoDelete,
- bool internal, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void delete_(u_int16_t channel, u_int16_t ticket,
- const std::string& exchange, bool ifUnused, bool nowait);
- void unbind(u_int16_t channel,
- u_int16_t ticket, const std::string& queue,
- const std::string& exchange, const std::string& routingKey,
- const qpid::framing::FieldTable& arguments );
- };
-
- class QueueHandlerImpl : public QueueHandler{
- Connection& connection;
- public:
- QueueHandlerImpl(Connection& c) : connection(c) {}
- void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue,
- const std::string& exchange, const std::string& routingKey,
- bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(u_int16_t channel,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& exchange,
- const std::string& routingKey,
- const qpid::framing::FieldTable& arguments );
- void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue,
- bool nowait);
- void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty,
- bool nowait);
- };
-
- class BasicHandlerImpl : public BasicHandler{
- Connection& connection;
- public:
- BasicHandlerImpl(Connection& c) : connection(c) {}
- void qos(u_int16_t channel, u_int32_t prefetchSize,
- u_int16_t prefetchCount, bool global);
- void consume(
- u_int16_t channel, u_int16_t ticket, const std::string& queue,
- const std::string& consumerTag, bool noLocal, bool noAck,
- bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(u_int16_t channel, const std::string& consumerTag,
- bool nowait);
- void publish(u_int16_t channel, u_int16_t ticket,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
- void get(u_int16_t channel, u_int16_t ticket, const std::string& queue,
- bool noAck);
- void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
- void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
- void recover(u_int16_t channel, bool requeue);
- };
-
- class TxHandlerImpl : public TxHandler{
- Connection& connection;
- public:
- TxHandlerImpl(Connection& c) : connection(c) {}
- void select(u_int16_t channel);
- void commit(u_int16_t channel);
- void rollback(u_int16_t channel);
- };
-
- class MessageHandlerImpl : public MessageHandler {
- Connection& connection;
- public:
- MessageHandlerImpl(Connection& c) : connection(c) {}
-
- void append( u_int16_t channel,
- const std::string& reference,
- const std::string& bytes );
-
- void cancel( u_int16_t channel,
- const std::string& destination );
-
- void checkpoint( u_int16_t channel,
- const std::string& reference,
- const std::string& identifier );
-
- void close( u_int16_t channel,
- const std::string& reference );
-
- void consume( u_int16_t channel,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const qpid::framing::FieldTable& filter );
-
- void empty( u_int16_t channel );
-
- void get( u_int16_t channel,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noAck );
-
- void offset( u_int16_t channel,
- u_int64_t value );
-
- void ok( u_int16_t channel );
-
- void open( u_int16_t channel,
- const std::string& reference );
-
- void qos( u_int16_t channel,
- u_int32_t prefetchSize,
- u_int16_t prefetchCount,
- bool global );
-
- void recover( u_int16_t channel,
- bool requeue );
-
- void reject( u_int16_t channel,
- u_int16_t code,
- const std::string& text );
-
- void resume( u_int16_t channel,
- const std::string& reference,
- const std::string& identifier );
-
- void transfer( u_int16_t channel,
- u_int16_t ticket,
- const std::string& destination,
- bool redelivered,
- bool immediate,
- u_int64_t ttl,
- u_int8_t priority,
- u_int64_t timestamp,
- u_int8_t deliveryMode,
- u_int64_t expiration,
- const std::string& exchange,
- const std::string& routingKey,
- const std::string& messageId,
- const std::string& correlationId,
- const std::string& replyTo,
- const std::string& contentType,
- const std::string& contentEncoding,
- const std::string& userId,
- const std::string& appId,
- const std::string& transactionId,
- const std::string& securityToken,
- const qpid::framing::FieldTable& applicationHeaders,
- qpid::framing::Content body );
- };
-
+ std::auto_ptr<Channel> channel;
Connection& connection;
-
- BasicHandlerImpl basicHandler;
- ChannelHandlerImpl channelHandler;
- ConnectionHandlerImpl connectionHandler;
- ExchangeHandlerImpl exchangeHandler;
- MessageHandlerImpl messageHandler;
- QueueHandlerImpl queueHandler;
- TxHandlerImpl txHandler;
+ Broker& broker;
+ boost::shared_ptr<ServerOps> serverOps;
};
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Wed Jan 17 22:27:50 2007
@@ -18,12 +18,13 @@
* under the License.
*
*/
-#include <BrokerChannel.h>
-#include <QpidError.h>
#include <iostream>
#include <sstream>
#include <assert.h>
+#include <BrokerChannel.h>
+#include <QpidError.h>
+
using std::mem_fun_ref;
using std::bind2nd;
using namespace qpid::broker;
@@ -31,9 +32,13 @@
using namespace qpid::sys;
-Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
- id(_id),
- out(_out),
+Channel::Channel(
+ const ProtocolVersion& _version, OutputHandler* _out, int _id,
+ u_int32_t _framesize, MessageStore* const _store,
+ u_int64_t _stagingThreshold
+) :
+ id(_id),
+ out(*_out),
currentDeliveryTag(1),
transactional(false),
prefetchSize(0),
@@ -43,9 +48,8 @@
store(_store),
messageBuilder(this, _store, _stagingThreshold),
version(_version),
- isClosed(false)
+ opened(false)
{
-
outstanding.reset();
}
@@ -57,7 +61,7 @@
return consumers.find(consumerTag) != consumers.end();
}
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) {
if(tag.empty()) tag = tagGenerator.generate();
ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
@@ -86,8 +90,8 @@
}
void Channel::close(){
- if (!isClosed) {
- isClosed = true;
+ if (isOpen()) {
+ opened = false;
while (!consumers.empty())
cancel(consumers.begin());
//requeue:
@@ -123,7 +127,7 @@
outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
+ msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -180,6 +184,10 @@
messageBuilder.addContent(content);
}
+void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+ // TODO aconway 2007-01-17: Implement heartbeating.
+}
+
void Channel::complete(Message::shared_ptr& msg){
if(exchange){
if(transactional){
@@ -247,7 +255,7 @@
if(msg){
Mutex::ScopedLock locker(deliveryLock);
u_int64_t myDeliveryTag = currentDeliveryTag++;
- msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
+ msg->sendGetOk(&out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -258,5 +266,6 @@
}
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
- msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
+ msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
}
+
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Wed Jan 17 22:27:50 2007
@@ -48,83 +48,94 @@
#include <BasicPublishBody.h>
namespace qpid {
- namespace broker {
- using qpid::framing::string;
+namespace broker {
- /**
- * Maintains state for an AMQP channel. Handles incoming and
- * outgoing messages for that channel.
- */
- class Channel : private MessageBuilder::CompletionHandler{
- class ConsumerImpl : public virtual Consumer{
- Channel* parent;
- const string tag;
- Queue::shared_ptr queue;
- ConnectionToken* const connection;
- const bool ackExpected;
- bool blocked;
- public:
- ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
- virtual bool deliver(Message::shared_ptr& msg);
- void cancel();
- void requestDispatch();
- };
-
- typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
- const int id;
- qpid::framing::OutputHandler* out;
- u_int64_t currentDeliveryTag;
- Queue::shared_ptr defaultQueue;
- bool transactional;
- std::map<string, ConsumerImpl*> consumers;
- u_int32_t prefetchSize;
- u_int16_t prefetchCount;
- Prefetch outstanding;
- u_int32_t framesize;
- NameGenerator tagGenerator;
- std::list<DeliveryRecord> unacked;
- qpid::sys::Mutex deliveryLock;
- TxBuffer txBuffer;
- AccumulatedAck accumulatedAck;
- MessageStore* const store;
- MessageBuilder messageBuilder;//builder for in-progress message
- Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
- qpid::framing::ProtocolVersion version; // version used for this channel
- bool isClosed;
-
- virtual void complete(Message::shared_ptr& msg);
- void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void cancel(consumer_iterator consumer);
- bool checkPrefetch(Message::shared_ptr& msg);
+using qpid::framing::string;
+
+/**
+ * Maintains state for an AMQP channel. Handles incoming and
+ * outgoing messages for that channel.
+ */
+class Channel : private MessageBuilder::CompletionHandler
+{
+ class ConsumerImpl : public virtual Consumer
+ {
+ Channel* parent;
+ const string tag;
+ Queue::shared_ptr queue;
+ ConnectionToken* const connection;
+ const bool ackExpected;
+ bool blocked;
+ public:
+ ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
+ virtual bool deliver(Message::shared_ptr& msg);
+ void cancel();
+ void requestDispatch();
+ };
+
+ typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+ u_int16_t id;
+ qpid::framing::OutputHandler& out;
+ u_int64_t currentDeliveryTag;
+ Queue::shared_ptr defaultQueue;
+ bool transactional;
+ std::map<string, ConsumerImpl*> consumers;
+ u_int32_t prefetchSize;
+ u_int16_t prefetchCount;
+ Prefetch outstanding;
+ u_int32_t framesize;
+ NameGenerator tagGenerator;
+ std::list<DeliveryRecord> unacked;
+ qpid::sys::Mutex deliveryLock;
+ TxBuffer txBuffer;
+ AccumulatedAck accumulatedAck;
+ MessageStore* const store;
+ MessageBuilder messageBuilder;//builder for in-progress message
+ Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
+ qpid::framing::ProtocolVersion version; // version used for this channel
+ bool opened;
+
+ virtual void complete(Message::shared_ptr& msg);
+ void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
+ void cancel(consumer_iterator consumer);
+ bool checkPrefetch(Message::shared_ptr& msg);
- public:
- Channel(qpid::framing::ProtocolVersion& _version, qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
- MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
- ~Channel();
- inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
- inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
- inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
- inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; }
- bool exists(const string& consumerTag);
- void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive,
- ConnectionToken* const connection = 0, const qpid::framing::FieldTable* = 0);
- void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, bool ackExpected);
- void begin();
- void close();
- void commit();
- void rollback();
- void ack(u_int64_t deliveryTag, bool multiple);
- void recover(bool requeue);
- void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
- void handlePublish(Message* msg, Exchange::shared_ptr exchange);
- void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
- };
-
- struct InvalidAckException{};
- }
-}
+ public:
+ Channel(
+ const qpid::framing::ProtocolVersion& _version,
+ qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
+ MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
+ ~Channel();
+ bool isOpen() const { return opened; }
+ void open() { opened = true; }
+ u_int16_t getId() const { return id; }
+ void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+ Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
+ u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
+ u_int16_t setPrefetchCount(u_int16_t n){ return prefetchCount = n; }
+
+ bool exists(const string& consumerTag);
+ void consume(string& tag, Queue::shared_ptr queue, bool acks,
+ bool exclusive, ConnectionToken* const connection = 0,
+ const qpid::framing::FieldTable* = 0);
+ void cancel(const string& tag);
+ bool get(Queue::shared_ptr queue, bool ackExpected);
+ void begin();
+ void close();
+ void commit();
+ void rollback();
+ void ack(u_int64_t deliveryTag, bool multiple);
+ void recover(bool requeue);
+ void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
+ void handlePublish(Message* msg, Exchange::shared_ptr exchange);
+ void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr);
+ void handleContent(qpid::framing::AMQContentBody::shared_ptr);
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr);
+};
+
+struct InvalidAckException{};
+
+}} // namespace qpid::broker
#endif /*!_broker_BrokerChannel_h*/
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Wed Jan 17 22:27:50 2007
@@ -23,10 +23,6 @@
#include "Connection.h"
-// TODO aconway 2007-01-16: move to channel.
-#include "Requester.h"
-#include "Responder.h"
-
using namespace boost;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -36,9 +32,6 @@
namespace broker {
Connection::Connection(SessionContext* context_, Broker& broker_) :
- adapter(*this),
- requester(broker.getRequester()),
- responder(broker.getResponder()),
context(context_),
framemax(65536),
heartbeat(0),
@@ -65,89 +58,15 @@
return broker.getExchanges().get(name);
}
-void Connection::handleMethod(
- u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
-{
- AMQMethodBody::shared_ptr method =
- shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
- try{
- method->invoke(adapter, channel);
- }catch(ChannelException& e){
- closeChannel(channel);
- client->getChannel().close(
- channel, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- }catch(ConnectionException& e){
- client->getConnection().close(
- 0, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- client->getConnection().close(
- 0, 541/*internal error*/, e.what(),
- method->amqpClassId(), method->amqpMethodId());
- }
-}
void Connection::received(qpid::framing::AMQFrame* frame){
- u_int16_t channel = frame->getChannel();
- AMQBody::shared_ptr body = frame->getBody();
- switch(body->type())
- {
- case REQUEST_BODY:
- responder.received(AMQRequestBody::getData(body));
- handleMethod(channel, body);
- break;
- case RESPONSE_BODY:
- // Must process responses before marking them received.
- handleMethod(channel, body);
- requester.processed(AMQResponseBody::getData(body));
- break;
- // TODO aconway 2007-01-15: Leftover from 0-8 support, remove.
- case METHOD_BODY:
- handleMethod(channel, body);
- break;
- case HEADER_BODY:
- handleHeader(
- channel, shared_polymorphic_cast<AMQHeaderBody>(body));
- break;
-
- case CONTENT_BODY:
- handleContent(
- channel, shared_polymorphic_cast<AMQContentBody>(body));
- break;
-
- case HEARTBEAT_BODY:
- assert(channel == 0);
- handleHeartbeat(
- shared_polymorphic_cast<AMQHeartbeatBody>(body));
- break;
- }
-}
-
-/**
- * An OutputHandler that does request/response procssing before
- * delgating to another OutputHandler.
- */
-Connection::Sender::Sender(
- OutputHandler& oh, Requester& req, Responder& resp)
- : out(oh), requester(req), responder(resp)
-{}
-
-void Connection::Sender::send(AMQFrame* frame) {
- AMQBody::shared_ptr body = frame->getBody();
- u_int16_t type = body->type();
- if (type == REQUEST_BODY)
- requester.sending(AMQRequestBody::getData(body));
- else if (type == RESPONSE_BODY)
- responder.sending(AMQResponseBody::getData(body));
- out.send(frame);
+ getAdapter(frame->getChannel()).handleBody(frame->getBody());
}
void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
if (client.get())
- // TODO aconway 2007-01-16: correct code.
+ // TODO aconway 2007-01-16: correct error code.
throw ConnectionException(0, "Connection initiated twice");
-
client.reset(new qpid::framing::AMQP_ClientProxy(
context, header->getMajor(), header->getMinor()));
FieldTable properties;
@@ -159,7 +78,6 @@
mechanisms, locales);
}
-
void Connection::idleOut(){}
void Connection::idleIn(){}
@@ -177,42 +95,29 @@
}
}
-// TODO aconway 2007-01-16: colapse these.
-void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
- getChannel(channel).handleHeader(body);
+void Connection::closeChannel(u_int16_t channel) {
+ getChannel(channel).close();
+ adapters.erase(adapters.find(channel));
}
-void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
- getChannel(channel).handleContent(body);
-}
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- std::cout << "Connection::handleHeartbeat()" << std::endl;
-}
-
-void Connection::openChannel(u_int16_t channel) {
- if (channel == 0)
- throw ConnectionException(504, "Illegal channel 0");
- if (channels.find(channel) != channels.end())
- throw ConnectionException(504, "Channel already open: " + channel);
- channels.insert(
- channel,
- new Channel(
- client->getProtocolVersion(), context, channel, framemax,
- broker.getQueues().getStore(), settings.stagingThreshold));
-}
-
-void Connection::closeChannel(u_int16_t channel) {
- getChannel(channel).close(); // throws if channel does not exist.
- channels.erase(channels.find(channel));
+BrokerAdapter& Connection::getAdapter(u_int16_t id) {
+ AdapterMap::iterator i = adapters.find(id);
+ if (i == adapters.end()) {
+ Channel* ch=new Channel(
+ client->getProtocolVersion(), context, id,
+ framemax, broker.getQueues().getStore(),
+ settings.stagingThreshold);
+ BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
+ adapters.insert(id, adapter);
+ return *adapter;
+ }
+ else
+ return *i;
}
-
-Channel& Connection::getChannel(u_int16_t channel){
- ChannelMap::iterator i = channels.find(channel);
- if(i == channels.end())
- throw ConnectionException(504, "Unknown channel: " + channel);
- return *i;
+Channel& Connection::getChannel(u_int16_t id) {
+ return getAdapter(id).getChannel();
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h Wed Jan 17 22:27:50 2007
@@ -33,8 +33,8 @@
#include <sys/ConnectionInputHandler.h>
#include <sys/TimeoutHandler.h>
#include "Broker.h"
-#include "BrokerAdapter.h"
#include "Exception.h"
+#include "BrokerAdapter.h"
namespace qpid {
namespace broker {
@@ -50,38 +50,15 @@
class Connection : public qpid::sys::ConnectionInputHandler,
public ConnectionToken
{
- typedef boost::ptr_map<u_int16_t, Channel> ChannelMap;
-
- // TODO aconway 2007-01-16: belongs on broker.
- typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
- class Sender : public qpid::framing::OutputHandler {
- public:
- Sender(qpid::framing::OutputHandler&,
- qpid::framing::Requester&, qpid::framing::Responder&);
- void send(qpid::framing::AMQFrame* frame);
- private:
- OutputHandler& out;
- qpid::framing::Requester& requester;
- qpid::framing::Responder& responder;
- };
-
- BrokerAdapter adapter;
- // FIXME aconway 2007-01-16: On Channel
- qpid::framing::Requester& requester;
- qpid::framing::Responder& responder;
- ChannelMap channels;
-
- void handleHeader(u_int16_t channel,
- qpid::framing::AMQHeaderBody::shared_ptr body);
- void handleContent(u_int16_t channel,
- qpid::framing::AMQContentBody::shared_ptr body);
- void handleMethod(u_int16_t channel,
- qpid::framing::AMQBody::shared_ptr body);
- void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+ typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
// FIXME aconway 2007-01-16: on broker.
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
Exchange::shared_ptr findExchange(const string& name);
+
+ BrokerAdapter& getAdapter(u_int16_t id);
+
+ AdapterMap adapters;
public:
Connection(qpid::sys::SessionContext* context, Broker& broker);
@@ -111,10 +88,9 @@
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
-
- void openChannel(u_int16_t channel);
- void closeChannel(u_int16_t channel);
+ Channel& newChannel(u_int16_t channel);
Channel& getChannel(u_int16_t channel);
+ void closeChannel(u_int16_t channel);
};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Wed Jan 17 22:27:50 2007
@@ -253,4 +253,5 @@
for(iterator i = channels.begin(); i != channels.end(); i++){
i->second->stop();
}
+ responses.signalResponse(AMQMethodBody::shared_ptr());
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp Wed Jan 17 22:27:50 2007
@@ -29,28 +29,28 @@
qpid::client::ResponseHandler::~ResponseHandler(){}
bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
- return expected.match(response.get());
+ return response != 0 && expected.match(response.get());
}
void qpid::client::ResponseHandler::waitForResponse(){
Monitor::ScopedLock l(monitor);
- if(waiting){
+ while (waiting)
monitor.wait();
- }
}
-void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
- response = _response;
+void qpid::client::ResponseHandler::signalResponse(
+ qpid::framing::AMQMethodBody::shared_ptr _response)
+{
Monitor::ScopedLock l(monitor);
+ response = _response;
waiting = false;
monitor.notify();
}
void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
Monitor::ScopedLock l(monitor);
- if(waiting){
+ while (waiting)
monitor.wait();
- }
if(!validate(expected)){
THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp Wed Jan 17 22:27:50 2007
@@ -18,38 +18,62 @@
* under the License.
*
*/
-#include <boost/shared_ptr.hpp>
-#include <BodyHandler.h>
+#include "QpidError.h"
+#include "BodyHandler.h"
+#include <AMQRequestBody.h>
+#include <AMQResponseBody.h>
+#include <AMQMethodBody.h>
+#include <AMQHeaderBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeartbeatBody.h>
using namespace qpid::framing;
using namespace boost;
BodyHandler::~BodyHandler() {}
-void BodyHandler::handleBody(const AMQBody::shared_ptr& body){
-
+void BodyHandler::handleBody(shared_ptr<AMQBody> body) {
switch(body->type())
{
- case METHOD_BODY:
case REQUEST_BODY:
+ handleRequest(shared_polymorphic_cast<AMQRequestBody>(body));
+ break;
case RESPONSE_BODY:
- handleMethod(dynamic_pointer_cast<AMQMethodBody, AMQBody>(body));
+ handleResponse(shared_polymorphic_cast<AMQResponseBody>(body));
+ break;
+ case METHOD_BODY:
+ handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
break;
-
case HEADER_BODY:
- handleHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body));
break;
-
case CONTENT_BODY:
- handleContent(dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ handleContent(shared_polymorphic_cast<AMQContentBody>(body));
break;
-
case HEARTBEAT_BODY:
- handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body));
break;
-
default:
- throw UnknownBodyType(body->type());
+ QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type());
}
+}
+void BodyHandler::handleRequest(AMQRequestBody::shared_ptr request) {
+ responder.received(request->getData());
+ handleMethod(request);
+}
+
+void BodyHandler::handleResponse(AMQResponseBody::shared_ptr response) {
+ handleMethod(response);
+ requester.processed(response->getData());
+}
+
+void BodyHandler::assertChannelZero(u_int16_t id) {
+ if (id != 0)
+ throw ConnectionException(504, "Invalid channel id, not 0");
+}
+
+void BodyHandler::assertChannelNonZero(u_int16_t id) {
+ if (id == 0)
+ throw ConnectionException(504, "Invalid channel id 0");
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h Wed Jan 17 22:27:50 2007
@@ -1,3 +1,6 @@
+#ifndef _BodyHandler_
+#define _BodyHandler_
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,37 +21,55 @@
* under the License.
*
*/
-#include <string>
-#ifndef _BodyHandler_
-#define _BodyHandler_
+#include <boost/shared_ptr.hpp>
-#include <AMQMethodBody.h>
-#include <AMQHeaderBody.h>
-#include <AMQContentBody.h>
-#include <AMQHeartbeatBody.h>
+#include "Requester.h"
+#include "Responder.h"
namespace qpid {
namespace framing {
- class BodyHandler{
- public:
- virtual ~BodyHandler();
- virtual void handleMethod(AMQMethodBody::shared_ptr body) = 0;
- virtual void handleHeader(AMQHeaderBody::shared_ptr body) = 0;
- virtual void handleContent(AMQContentBody::shared_ptr body) = 0;
- virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body) = 0;
-
- void handleBody(const AMQBody::shared_ptr& body);
- };
-
- class UnknownBodyType{
- public:
- const u_int16_t type;
- inline UnknownBodyType(u_int16_t _type) : type(_type){}
- };
-}
-}
+class AMQRequestBody;
+class AMQResponseBody;
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class AMQHeartbeatBody;
+
+/**
+ * Base class for client and broker channel handlers.
+ *
+ * Handles request/response id management common to client and broker.
+ * Derived classes provide remaining client/broker specific handling.
+ */
+class BodyHandler {
+ public:
+ virtual ~BodyHandler();
+
+ void handleBody(boost::shared_ptr<AMQBody> body);
+
+ protected:
+ virtual void handleRequest(boost::shared_ptr<AMQRequestBody>);
+ virtual void handleResponse(boost::shared_ptr<AMQResponseBody>);
+
+ virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0;
+ virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0;
+ virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0;
+ virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0;
+
+ protected:
+ /** Throw protocol exception if this is not channel 0. */
+ static void assertChannelZero(u_int16_t id);
+ /** Throw protocol exception if this is channel 0. */
+ static void assertChannelNonZero(u_int16_t id);
+
+ private:
+ Requester requester;
+ Responder responder;
+};
+
+}}
#endif
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp Wed Jan 17 22:27:50 2007
@@ -135,6 +135,7 @@
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0);
+ channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
ConnectionToken* owner = 0;
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Wed Jan 17 22:27:50 2007
@@ -41,7 +41,6 @@
ValueTest
framing_tests = \
- BodyHandlerTest \
FieldTableTest \
FramingTest \
HeaderTest
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker?view=diff&rev=497319&r1=497318&r2=497319
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker Wed Jan 17 22:27:50 2007
@@ -8,3 +8,7 @@
# Start the daemon, recording its PID.
../src/qpidd > $LOG 2>&1 & echo $! > $PID
+
+# FIXME aconway 2007-01-18: qpidd should not return till it is accepting
+# connections, remove arbitrary sleep.
+sleep 1