You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/02 23:03:12 UTC
svn commit: r502767 [1/2] - in /incubator/qpid/branches/qpid.0-9:
cpp/lib/broker/ cpp/lib/client/ cpp/lib/common/framing/ cpp/tests/
gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/
Author: aconway
Date: Fri Feb 2 14:03:10 2007
New Revision: 502767
URL: http://svn.apache.org/viewvc?view=rev&rev=502767
Log:
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body,
ChannelAdapter is used to send frames, not OutputHandler.
* cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member.
Context is per-method not per-channel.
* cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId
with MethodContext (for responses) or ChannelAdapter (for requests.)
Use context request-ID to construct responses, send all bodies via
ChannelAdapter.
* cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter.
* cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters.
Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion&
* Cosmetic changes, many files:
- fixed indentation, broke long lines.
- removed unnecessary qpid:: prefixes.
* broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into
broker::channel.
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/NullMessageStore.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersion.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/ExchangeTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/AmqpMethod.java
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp Fri Feb 2 14:03:10 2007
@@ -54,7 +54,7 @@
factory(*this)
{
if (config.getStore().empty())
- store.reset(new NullMessageStore());
+ store.reset(new NullMessageStore(config.isTrace()));
else
store.reset(new MessageStoreModule(config.getStore()));
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Fri Feb 2 14:03:10 2007
@@ -28,168 +28,20 @@
using namespace qpid;
using namespace qpid::framing;
-typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+typedef std::vector<Queue::shared_ptr> QueueVector;
-class BrokerAdapter::ServerOps : public AMQP_ServerOperations
-{
- public:
- ServerOps(Channel& ch, Connection& c, Broker& b) :
- basicHandler(ch, c, b),
- channelHandler(ch, c, b),
- connectionHandler(ch, c, b),
- exchangeHandler(ch, c, b),
- messageHandler(ch, c, b),
- queueHandler(ch, c, b),
- txHandler(ch, c, b)
- {}
-
- ChannelHandler* getChannelHandler() { return &channelHandler; }
- ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
- BasicHandler* getBasicHandler() { return &basicHandler; }
- ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
- QueueHandler* getQueueHandler() { return &queueHandler; }
- TxHandler* getTxHandler() { return &txHandler; }
- MessageHandler* getMessageHandler() { return &messageHandler; }
- AccessHandler* getAccessHandler() {
- throw ConnectionException(540, "Access class not implemented"); }
- FileHandler* getFileHandler() {
- throw ConnectionException(540, "File class not implemented"); }
- StreamHandler* getStreamHandler() {
- throw ConnectionException(540, "Stream class not implemented"); }
- DtxHandler* getDtxHandler() {
- throw ConnectionException(540, "Dtx class not implemented"); }
- TunnelHandler* getTunnelHandler() {
- throw ConnectionException(540, "Tunnel class not implemented"); }
-
- private:
- struct CoreRefs {
- CoreRefs(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b) {}
-
- Channel& channel;
- Connection& connection;
- Broker& broker;
- };
-
- class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
- public:
- ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-
- void startOk(const MethodContext& context,
- const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const MethodContext& context, const std::string& response);
- void tuneOk(const MethodContext& context, u_int16_t channelMax,
- u_int32_t frameMax, u_int16_t heartbeat);
- void open(const MethodContext& context, const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(const MethodContext& context, u_int16_t replyCode,
- const std::string& replyText,
- u_int16_t classId, u_int16_t methodId);
- void closeOk(const MethodContext& context);
- };
-
- class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
- public:
- ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void open(const MethodContext& context, const std::string& outOfBand);
- void flow(const MethodContext& context, bool active);
- void flowOk(const MethodContext& context, bool active);
- void ok( const MethodContext& context );
- void ping( const MethodContext& context );
- void pong( const MethodContext& context );
- void resume( const MethodContext& context, const std::string& channelId );
- void close(const MethodContext& context, u_int16_t replyCode, const
- std::string& replyText, u_int16_t classId, u_int16_t methodId);
- void closeOk(const MethodContext& context);
- };
-
- class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
- public:
- ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, const std::string& type,
- bool passive, bool durable, bool autoDelete,
- bool internal, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void delete_(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, bool ifUnused, bool nowait);
- };
-
- class QueueHandlerImpl : private CoreRefs, public QueueHandler{
- public:
- QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void bind(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- const std::string& exchange, const std::string& routingKey,
- bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(const MethodContext& context,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& exchange,
- const std::string& routingKey,
- const qpid::framing::FieldTable& arguments );
- void purge(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool nowait);
- void delete_(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty,
- bool nowait);
- };
-
- class BasicHandlerImpl : private CoreRefs, public BasicHandler{
- public:
- BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void qos(const MethodContext& context, u_int32_t prefetchSize,
- u_int16_t prefetchCount, bool global);
- void consume(
- const MethodContext& context, u_int16_t ticket, const std::string& queue,
- const std::string& consumerTag, bool noLocal, bool noAck,
- bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(const MethodContext& context, const std::string& consumerTag,
- bool nowait);
- void publish(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
- void get(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool noAck);
- void ack(const MethodContext& context, u_int64_t deliveryTag, bool multiple);
- void reject(const MethodContext& context, u_int64_t deliveryTag, bool requeue);
- void recover(const MethodContext& context, bool requeue);
- };
-
- class TxHandlerImpl : private CoreRefs, public TxHandler{
- public:
- TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void select(const MethodContext& context);
- void commit(const MethodContext& context);
- void rollback(const MethodContext& context);
- };
-
- BasicHandlerImpl basicHandler;
- ChannelHandlerImpl channelHandler;
- ConnectionHandlerImpl connectionHandler;
- ExchangeHandlerImpl exchangeHandler;
- MessageHandlerImpl messageHandler;
- QueueHandlerImpl queueHandler;
- TxHandlerImpl txHandler;
-
-};
-
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
- const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext& context , const FieldTable& /*clientProperties*/,
+ const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
connection.client->getConnection().tune(
context, 100, connection.getFrameMax(), connection.getHeartbeat());
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk(
+ const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
const MethodContext&, u_int16_t /*channelmax*/,
u_int32_t framemax, u_int16_t heartbeat)
{
@@ -197,12 +49,12 @@
connection.setHeartbeat(heartbeat);
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
connection.client->getConnection().openOk(context, knownhosts);
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
@@ -210,21 +62,21 @@
connection.getOutput().close();
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
connection.getOutput().close();
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
connection.client->getChannel().openOk(context, std::string()/* ID */);
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/,
const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
@@ -234,13 +86,13 @@
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -265,17 +117,17 @@
}
}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
if(!nowait) connection.client->getExchange().deleteOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = connection.getQueue(name, channel.getId());
@@ -308,9 +160,9 @@
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
@@ -325,7 +177,7 @@
}
void
-BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
+BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
@@ -344,15 +196,15 @@
connection.client->getQueue().unbindOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
if(!nowait) connection.client->getQueue().purgeOk(context, count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
@@ -363,7 +215,7 @@
}else{
//remove the queue from the list of exclusive queues if necessary
if(q->isExclusiveOwner(&connection)){
- queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
+ QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
}
count = q->getMessageCount();
@@ -377,14 +229,14 @@
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
connection.client->getBasic().qosOk(context);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -412,19 +264,23 @@
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
+ const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate)
+{
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
+ BasicMessage* msg = new BasicMessage(
+ &connection, exchangeName, routingKey, mandatory, immediate,
+ context.methodBody);
channel.handlePublish(msg, exchange);
}else{
throw ChannelException(
@@ -432,7 +288,7 @@
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
@@ -441,7 +297,7 @@
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
try{
channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -449,23 +305,23 @@
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
connection.client->getTx().selectOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
connection.client->getTx().commitOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
connection.client->getTx().rollbackOk(context);
@@ -473,82 +329,32 @@
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& )
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
{
//no specific action required, generic response handling should be sufficient
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context)
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
{
connection.client->getChannel().ok(context);
connection.client->getChannel().pong(context);
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
{
connection.client->getChannel().ok(context);
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
const MethodContext&,
const string& /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
-BrokerAdapter::BrokerAdapter(
- std::auto_ptr<Channel> ch, Connection& c, Broker& b
-) :
- channel(ch),
- connection(c),
- broker(b),
- serverOps(new ServerOps(*channel,c,b))
-{
- init(channel->getId(), c.getOutput(), channel->getVersion());
-}
-
-void BrokerAdapter::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
-{
- try{
- method->invoke(*serverOps, context);
- }catch(ChannelException& e){
- connection.client->getChannel().close(
- context, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(ConnectionException& e){
- connection.client->getConnection().close(
- context, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.client->getConnection().close(
- context, 541/*internal error*/, e.what(),
- method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
- channel->handleHeader(body);
-}
-
-void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) {
- channel->handleContent(body);
-}
-
-void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
- // TODO aconway 2007-01-17: Implement heartbeats.
-}
-
-
-bool BrokerAdapter::isOpen() const {
- return channel->isOpen();
-}
}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Fri Feb 2 14:03:10 2007
@@ -18,18 +18,14 @@
* limitations under the License.
*
*/
-#include <memory.h>
-
#include "AMQP_ServerOperations.h"
-#include "BodyHandler.h"
+#include "MessageHandlerImpl.h"
#include "BrokerChannel.h"
-#include "amqp_types.h"
-#include "framing/ChannelAdapter.h"
namespace qpid {
namespace broker {
-class AMQMethodBody;
+class Channel;
class Connection;
class Broker;
@@ -38,35 +34,173 @@
*
* Translates protocol bodies into calls on the core Channel,
* Connection and Broker objects.
- *
- * Owns a channel, has references to Connection and Broker.
*/
-class BrokerAdapter : public framing::ChannelAdapter
-{
- public:
- BrokerAdapter(std::auto_ptr<Channel> ch, Connection&, Broker&);
- Channel& getChannel() { return *channel; }
- void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+class ChannelHandler;
+class ConnectionHandler;
+class BasicHandler;
+class ExchangeHandler;
+class QueueHandler;
+class TxHandler;
+class MessageHandler;
+class AccessHandler;
+class FileHandler;
+class StreamHandler;
+class DtxHandler;
+class TunnelHandler;
- bool isOpen() const;
+class BrokerAdapter : public framing::AMQP_ServerOperations
+{
+ public:
+ BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+ basicHandler(ch, c, b),
+ channelHandler(ch, c, b),
+ connectionHandler(ch, c, b),
+ exchangeHandler(ch, c, b),
+ messageHandler(ch, c, b),
+ queueHandler(ch, c, b),
+ txHandler(ch, c, b)
+ {}
+ ChannelHandler* getChannelHandler() { return &channelHandler; }
+ ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
+ BasicHandler* getBasicHandler() { return &basicHandler; }
+ ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+ QueueHandler* getQueueHandler() { return &queueHandler; }
+ TxHandler* getTxHandler() { return &txHandler; }
+ MessageHandler* getMessageHandler() { return &messageHandler; }
+ AccessHandler* getAccessHandler() {
+ throw ConnectionException(540, "Access class not implemented"); }
+ FileHandler* getFileHandler() {
+ throw ConnectionException(540, "File class not implemented"); }
+ StreamHandler* getStreamHandler() {
+ throw ConnectionException(540, "Stream class not implemented"); }
+ DtxHandler* getDtxHandler() {
+ throw ConnectionException(540, "Dtx class not implemented"); }
+ TunnelHandler* getTunnelHandler() {
+ throw ConnectionException(540, "Tunnel class not implemented"); }
+
private:
- void handleMethodInContext(
- boost::shared_ptr<framing::AMQMethodBody> method,
- const framing::MethodContext& context);
+ struct CoreRefs {
+ CoreRefs(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b) {}
+
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+ };
- class ServerOps;
+ class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
+ public:
+ ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+
+ void startOk(const framing::MethodContext& context,
+ const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(const framing::MethodContext& context,
+ const std::string& response);
+ void tuneOk(const framing::MethodContext& context,
+ u_int16_t channelMax,
+ u_int32_t frameMax, u_int16_t heartbeat);
+ void open(const framing::MethodContext& context,
+ const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(const framing::MethodContext& context, u_int16_t replyCode,
+ const std::string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+ void closeOk(const framing::MethodContext& context);
+ };
+
+ class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
+ public:
+ ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void open(const framing::MethodContext& context, const std::string& outOfBand);
+ void flow(const framing::MethodContext& context, bool active);
+ void flowOk(const framing::MethodContext& context, bool active);
+ void ok( const framing::MethodContext& context );
+ void ping( const framing::MethodContext& context );
+ void pong( const framing::MethodContext& context );
+ void resume( const framing::MethodContext& context, const std::string& channelId );
+ void close(const framing::MethodContext& context, u_int16_t replyCode, const
+ std::string& replyText, u_int16_t classId, u_int16_t methodId);
+ void closeOk(const framing::MethodContext& context);
+ };
+
+ class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
+ public:
+ ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void declare(const framing::MethodContext& context, u_int16_t ticket,
+ const std::string& exchange, const std::string& type,
+ bool passive, bool durable, bool autoDelete,
+ bool internal, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(const framing::MethodContext& context, u_int16_t ticket,
+ const std::string& exchange, bool ifUnused, bool nowait);
+ };
+
+ class QueueHandlerImpl : private CoreRefs, public QueueHandler{
+ public:
+ QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void bind(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ bool nowait, const qpid::framing::FieldTable& arguments);
+ void unbind(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments );
+ void purge(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ bool nowait);
+ void delete_(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ bool ifUnused, bool ifEmpty,
+ bool nowait);
+ };
+
+ class BasicHandlerImpl : private CoreRefs, public BasicHandler{
+ public:
+ BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void qos(const framing::MethodContext& context, u_int32_t prefetchSize,
+ u_int16_t prefetchCount, bool global);
+ void consume(
+ const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ const std::string& consumerTag, bool noLocal, bool noAck,
+ bool exclusive, bool nowait,
+ const qpid::framing::FieldTable& fields);
+ void cancel(const framing::MethodContext& context, const std::string& consumerTag,
+ bool nowait);
+ void publish(const framing::MethodContext& context, u_int16_t ticket,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
+ void get(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue,
+ bool noAck);
+ void ack(const framing::MethodContext& context, u_int64_t deliveryTag, bool multiple);
+ void reject(const framing::MethodContext& context, u_int64_t deliveryTag, bool requeue);
+ void recover(const framing::MethodContext& context, bool requeue);
+ };
+
+ class TxHandlerImpl : private CoreRefs, public TxHandler{
+ public:
+ TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
+ void select(const framing::MethodContext& context);
+ void commit(const framing::MethodContext& context);
+ void rollback(const framing::MethodContext& context);
+ };
+
+ BasicHandlerImpl basicHandler;
+ ChannelHandlerImpl channelHandler;
+ ConnectionHandlerImpl connectionHandler;
+ ExchangeHandlerImpl exchangeHandler;
+ MessageHandlerImpl messageHandler;
+ QueueHandlerImpl queueHandler;
+ TxHandlerImpl txHandler;
- std::auto_ptr<Channel> channel;
- Connection& connection;
- Broker& broker;
- boost::shared_ptr<ServerOps> serverOps;
};
-
-
}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Fri Feb 2 14:03:10 2007
@@ -18,12 +18,25 @@
* under the License.
*
*/
+#include <assert.h>
+
#include <iostream>
#include <sstream>
-#include <assert.h>
+#include <algorithm>
+#include <functional>
-#include <BrokerChannel.h>
+#include "BrokerChannel.h"
+#include "DeletingTxOp.h"
+#include "framing/ChannelAdapter.h"
#include <QpidError.h>
+#include <DeliverableMessage.h>
+#include <BrokerQueue.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <TxAck.h>
+#include <TxPublish.h>
+#include "BrokerAdapter.h"
+#include "Connection.h"
using std::mem_fun_ref;
using std::bind2nd;
@@ -33,12 +46,12 @@
Channel::Channel(
- const ProtocolVersion& _version, OutputHandler* _out, int _id,
+ Connection& con, ChannelId id,
u_int32_t _framesize, MessageStore* const _store,
u_int64_t _stagingThreshold
) :
- id(_id),
- out(*_out),
+ ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()),
+ connection(con),
currentDeliveryTag(1),
transactional(false),
prefetchSize(0),
@@ -47,8 +60,8 @@
tagGenerator("sgen"),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
- version(_version),
- opened(false)
+ opened(true),
+ adapter(new BrokerAdapter(*this, con, con.broker))
{
outstanding.reset();
}
@@ -61,7 +74,10 @@
return consumers.find(consumerTag) != consumers.end();
}
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) {
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,
+ bool exclusive, ConnectionToken* const connection,
+ const FieldTable*)
+{
if(tag.empty()) tag = tagGenerator.generate();
ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
@@ -117,7 +133,10 @@
accumulatedAck.clear();
}
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+void Channel::deliver(
+ Message::shared_ptr& msg, const string& consumerTag,
+ Queue::shared_ptr& queue, bool ackExpected)
+{
Mutex::ScopedLock locker(deliveryLock);
u_int64_t deliveryTag = currentDeliveryTag++;
@@ -127,7 +146,7 @@
outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
+ msg->deliver(*this, consumerTag, deliveryTag, framesize);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -184,7 +203,7 @@
messageBuilder.addContent(content);
}
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
// TODO aconway 2007-01-17: Implement heartbeating.
}
@@ -255,7 +274,9 @@
if(msg){
Mutex::ScopedLock locker(deliveryLock);
u_int64_t myDeliveryTag = currentDeliveryTag++;
- msg->sendGetOk(&out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize, &version);
+ msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
+ queue->getMessageCount() + 1, myDeliveryTag,
+ framesize);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -265,7 +286,32 @@
}
}
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
- msg->deliver(&out, id, consumerTag, deliveryTag, framesize, &version);
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
+ u_int64_t deliveryTag)
+{
+ msg->deliver(*this, consumerTag, deliveryTag, framesize);
+}
+
+void Channel::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
+{
+ try{
+ method->invoke(*adapter, context);
+ }catch(ChannelException& e){
+ connection.client->getChannel().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.client->getConnection().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.client->getConnection().close(
+ context, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Fri Feb 2 14:03:10 2007
@@ -22,42 +22,39 @@
*
*/
-#include <algorithm>
-#include <functional>
#include <list>
#include <map>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
#include <AccumulatedAck.h>
-#include <Binding.h>
#include <Consumer.h>
-#include <DeletingTxOp.h>
-#include <DeliverableMessage.h>
#include <DeliveryRecord.h>
-#include <BrokerMessage.h>
#include <MessageBuilder.h>
#include <NameGenerator.h>
#include <Prefetch.h>
-#include <BrokerQueue.h>
-#include <MessageStore.h>
-#include <TxAck.h>
#include <TxBuffer.h>
-#include <TxPublish.h>
-#include <sys/Monitor.h>
-#include <OutputHandler.h>
-#include <AMQContentBody.h>
-#include <AMQHeaderBody.h>
-#include <AMQHeartbeatBody.h>
-#include <BasicPublishBody.h>
+#include "framing/ChannelAdapter.h"
namespace qpid {
namespace broker {
-using qpid::framing::string;
+class ConnectionToken;
+class Connection;
+class Queue;
+class BrokerAdapter;
+
+using framing::string;
/**
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : private MessageBuilder::CompletionHandler {
+class Channel :
+ public framing::ChannelAdapter,
+ private MessageBuilder::CompletionHandler
+{
class ConsumerImpl : public virtual Consumer
{
Channel* parent;
@@ -67,15 +64,18 @@
const bool ackExpected;
bool blocked;
public:
- ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
+ ConsumerImpl(Channel* parent, const string& tag,
+ Queue::shared_ptr queue,
+ ConnectionToken* const connection, bool ack);
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
void requestDispatch();
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+
+ Connection& connection;
u_int16_t id;
- qpid::framing::OutputHandler& out;
u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
@@ -86,30 +86,32 @@
u_int32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- qpid::sys::Mutex deliveryLock;
+ sys::Mutex deliveryLock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
- qpid::framing::ProtocolVersion version; // version used for this channel
bool opened;
+ boost::scoped_ptr<BrokerAdapter> adapter;
+
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
void cancel(consumer_iterator consumer);
bool checkPrefetch(Message::shared_ptr& msg);
public:
- Channel(
- const qpid::framing::ProtocolVersion& _version,
- qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
- MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
+ Channel(Connection& channel,
+ framing::ChannelId id,
+ u_int32_t framesize,
+ MessageStore* const _store = 0,
+ u_int64_t stagingThreshold = 0);
+
~Channel();
+
bool isOpen() const { return opened; }
- const framing::ProtocolVersion& getVersion() const { return version; }
void open() { opened = true; }
- u_int16_t getId() const { return id; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
@@ -118,7 +120,7 @@
bool exists(const string& consumerTag);
void consume(string& tag, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
- const qpid::framing::FieldTable* = 0);
+ const framing::FieldTable* = 0);
void cancel(const string& tag);
bool get(Queue::shared_ptr queue, bool ackExpected);
void begin();
@@ -129,14 +131,18 @@
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
void handlePublish(Message* msg, Exchange::shared_ptr exchange);
- void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr);
- void handleContent(qpid::framing::AMQContentBody::shared_ptr);
- void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr);
+ void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+ void handleMethodInContext(
+ boost::shared_ptr<framing::AMQMethodBody> method,
+ const framing::MethodContext& context);
+
};
struct InvalidAckException{};
-}} // namespace qpid::broker
+}} // namespace broker
#endif /*!_broker_BrokerChannel_h*/
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Fri Feb 2 14:03:10 2007
@@ -27,31 +27,35 @@
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) :
- Message(_exchange, _routingKey, _mandatory, _immediate),
- publisher(_publisher),
- size(0)
+BasicMessage::BasicMessage(
+ const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
+) :
+ Message(_exchange, _routingKey, _mandatory, _immediate, respondTo),
+ publisher(_publisher),
+ size(0)
{
}
-BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
- publisher(0), size(0)
-{
+// FIXME aconway 2007-02-01: remove.
+// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+// publisher(0), size(0)
+// {
- decode(buffer, headersOnly, contentChunkSize);
-}
+// decode(buffer, headersOnly, contentChunkSize);
+// }
+// For tests only.
BasicMessage::BasicMessage() : publisher(0), size(0)
-{
-}
+{}
BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
@@ -73,34 +77,42 @@
return header.get() && (header->getContentSize() == contentSize());
}
-void BasicMessage::deliver(OutputHandler* out, int channel,
- const string& consumerTag, u_int64_t deliveryTag,
- u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel,
- new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey())));
- sendContent(out, channel, framesize, version);
-}
-
-void BasicMessage::sendGetOk(OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel,
- new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount)));
- sendContent(out, channel, framesize, version);
-}
-
-void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
- AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
- out->send(new AMQFrame(*version, channel, headerBody));
+void BasicMessage::deliver(ChannelAdapter& channel,
+ const string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize)
+{
+ // CCT -- TODO - Update code generator to take pointer/ not
+ // instance to avoid extra contruction
+ channel.send(
+ new BasicDeliverBody(
+ channel.getVersion(), consumerTag, deliveryTag,
+ getRedelivered(), getExchange(), getRoutingKey()));
+ sendContent(channel, framesize);
+}
+
+void BasicMessage::sendGetOk(const MethodContext& context,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize)
+{
+ // CCT -- TODO - Update code generator to take pointer/ not
+ // instance to avoid extra contruction
+ context.channel->send(
+ new BasicGetOkBody(
+ context.channel->getVersion(),
+ context.methodBody->getRequestId(),
+ deliveryTag, getRedelivered(), getExchange(),
+ getRoutingKey(), messageCount));
+ sendContent(*context.channel, framesize);
+}
+void BasicMessage::sendContent(
+ ChannelAdapter& channel, u_int32_t framesize)
+{
+ channel.send(header);
Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->send(*version, out, channel, framesize);
+ if (content.get())
+ content->send(channel, framesize);
}
BasicHeaderProperties* BasicMessage::getHeaderProperties(){
@@ -126,8 +138,8 @@
void BasicMessage::decodeHeader(Buffer& buffer)
{
- string exchange;
- string routingKey;
+ string exchange;
+ string routingKey;
buffer.getShortString(exchange);
buffer.getShortString(routingKey);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h Fri Feb 2 14:03:10 2007
@@ -27,109 +27,111 @@
#include <boost/shared_ptr.hpp>
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
-#include <ProtocolVersion.h>
+#include "AMQMethodBody.h"
#include <BasicHeaderProperties.h>
#include <ConnectionToken.h>
#include <Content.h>
-#include <OutputHandler.h>
#include <Mutex.h>
#include <TxBuffer.h>
namespace qpid {
- namespace broker {
- class MessageStore;
- using qpid::framing::string;
+namespace framing {
+class MethodContext;
+class ChannelAdapter;
+}
+
+namespace broker {
+
+class MessageStore;
+using framing::string;
- /**
- * Represents an AMQP message, i.e. a header body, a list of
- * content bodies and some details about the publication
- * request.
- */
- class BasicMessage : public Message{
- const ConnectionToken* const publisher;
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::auto_ptr<Content> content;
- qpid::sys::Mutex contentLock;
- u_int64_t size;
-
- void sendContent(qpid::framing::OutputHandler* out,
- int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version);
-
- public:
- typedef boost::shared_ptr<BasicMessage> shared_ptr;
-
- BasicMessage(const ConnectionToken* const publisher,
- const string& exchange, const string& routingKey,
- bool mandatory, bool immediate);
- BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
- BasicMessage();
- ~BasicMessage();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr data);
- bool isComplete();
- const ConnectionToken* const getPublisher();
-
- void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
- void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
-
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- bool isPersistent();
- u_int64_t contentSize() const { return size; }
-
- void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
- void decodeHeader(qpid::framing::Buffer& buffer);
- void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
-
- void encode(qpid::framing::Buffer& buffer);
- void encodeHeader(qpid::framing::Buffer& buffer);
- void encodeContent(qpid::framing::Buffer& buffer);
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- u_int32_t encodedSize();
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- u_int32_t encodedHeaderSize();
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- u_int32_t encodedContentSize();
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- void releaseContent(MessageStore* store);
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- u_int64_t expectedContentSize();
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- void setContent(std::auto_ptr<Content>& content);
- };
+/**
+ * Represents an AMQP message, i.e. a header body, a list of
+ * content bodies and some details about the publication
+ * request.
+ */
+class BasicMessage : public Message {
+ const ConnectionToken* const publisher;
+ framing::AMQHeaderBody::shared_ptr header;
+ std::auto_ptr<Content> content;
+ sys::Mutex contentLock;
+ u_int64_t size;
+
+ void sendContent(framing::ChannelAdapter&, u_int32_t framesize);
+
+ public:
+ typedef boost::shared_ptr<BasicMessage> shared_ptr;
+
+ BasicMessage(const ConnectionToken* const publisher,
+ const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate,
+ framing::AMQMethodBody::shared_ptr respondTo);
+ BasicMessage();
+ ~BasicMessage();
+ void setHeader(framing::AMQHeaderBody::shared_ptr header);
+ void addContent(framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ const ConnectionToken* const getPublisher();
+
+ void deliver(framing::ChannelAdapter&,
+ const string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ void sendGetOk(const framing::MethodContext&,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ u_int64_t contentSize() const { return size; }
+
+ void decode(framing::Buffer& buffer, bool headersOnly = false,
+ u_int32_t contentChunkSize = 0);
+ void decodeHeader(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
+
+ void encode(framing::Buffer& buffer);
+ void encodeHeader(framing::Buffer& buffer);
+ void encodeContent(framing::Buffer& buffer);
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ */
+ u_int32_t encodedSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ */
+ u_int32_t encodedHeaderSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ u_int32_t encodedContentSize();
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ void releaseContent(MessageStore* store);
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ u_int64_t expectedContentSize();
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ void setContent(std::auto_ptr<Content>& content);
+};
- }
+}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h Fri Feb 2 14:03:10 2007
@@ -24,146 +24,148 @@
#include "AMQContentBody.h"
#include "AMQHeaderBody.h"
+#include "AMQMethodBody.h"
#include "Content.h"
+#include "framing/amqp_types.h"
#include <string>
#include <boost/shared_ptr.hpp>
namespace qpid {
- namespace framing {
- class OutputHandler;
- class ProtocolVersion;
- class BasicHeaderProperties;
- }
+namespace framing {
+class MethodContext;
+class ChannelAdapter;
+class BasicHeaderProperties;
+}
- namespace broker {
+namespace broker {
- class MessageStore;
- class ConnectionToken;
+class MessageStore;
+class ConnectionToken;
- /**
- * Base class for all types of internal broker messages
- * abstracting away the operations
- * TODO; AMS: for the moment this is mostly a placeholder
- */
- class Message{
- std::string exchange;
- std::string routingKey;
- const bool mandatory;
- const bool immediate;
- u_int64_t persistenceId;
-
- bool redelivered;
-
- public:
- typedef boost::shared_ptr<Message> shared_ptr;
-
- Message(const std::string& _exchange, const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- persistenceId(0),
- redelivered(false)
- {}
-
- Message() :
- mandatory(false),
- immediate(false),
- persistenceId(0),
- redelivered(false)
- {}
-
- virtual ~Message() {};
-
- // Accessors
- const std::string& getRoutingKey() const { return routingKey; }
- const std::string& getExchange() const { return exchange; }
- u_int64_t getPersistenceId() const { return persistenceId; }
- bool getRedelivered() const { return redelivered; }
-
- void setRouting(const std::string& _exchange, const std::string& _routingKey)
- { exchange = _exchange; routingKey = _routingKey; }
- void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
- void redeliver() { redelivered = true; }
-
- /**
- * Used to deliver the message from the queue
- */
- virtual void deliver(qpid::framing::OutputHandler* out,
- int channel,
+/**
+ * Base class for all types of internal broker messages
+ * abstracting away the operations
+ * TODO; AMS: for the moment this is mostly a placeholder
+ */
+class Message{
+ std::string exchange;
+ std::string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ u_int64_t persistenceId;
+ bool redelivered;
+ framing::AMQMethodBody::shared_ptr respondTo;
+
+ public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ Message(const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate,
+ framing::AMQMethodBody::shared_ptr respondTo_) :
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate),
+ persistenceId(0),
+ redelivered(false),
+ respondTo(respondTo_)
+ {}
+
+ Message() :
+ mandatory(false),
+ immediate(false),
+ persistenceId(0),
+ redelivered(false)
+ {}
+
+ virtual ~Message() {};
+
+ // Accessors
+ const std::string& getRoutingKey() const { return routingKey; }
+ const std::string& getExchange() const { return exchange; }
+ u_int64_t getPersistenceId() const { return persistenceId; }
+ bool getRedelivered() const { return redelivered; }
+ framing::AMQMethodBody::shared_ptr getRespondTo() const {
+ return respondTo;
+ }
+
+ void setRouting(const std::string& _exchange, const std::string& _routingKey)
+ { exchange = _exchange; routingKey = _routingKey; }
+ void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
+ void redeliver() { redelivered = true; }
+
+ /**
+ * Used to deliver the message from the queue
+ */
+ virtual void deliver(framing::ChannelAdapter& channel,
const std::string& consumerTag,
u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version) = 0;
- /**
- * Used to return a message in response to a get from a queue
- */
- virtual void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
+ u_int32_t framesize) = 0;
+ /**
+ * Used to return a message in response to a get from a queue
+ */
+ virtual void sendGetOk(const framing::MethodContext& context,
u_int32_t messageCount,
u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version) = 0;
+ u_int32_t framesize) = 0;
- virtual bool isComplete() = 0;
+ virtual bool isComplete() = 0;
- virtual u_int64_t contentSize() const = 0;
- virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
- virtual bool isPersistent() = 0;
- virtual const ConnectionToken* const getPublisher() = 0;
-
- virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
- virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
-
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- *
- * XXXX: Only used in tests?
- */
- virtual u_int32_t encodedSize() = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- *
- * XXXX: Only used in tests?
- */
- virtual u_int32_t encodedHeaderSize() = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- virtual u_int32_t encodedContentSize() = 0;
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- virtual u_int64_t expectedContentSize() = 0;
-
- // TODO: AMS 29/1/2007 Don't think these are really part of base class
-
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
- virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- virtual void releaseContent(MessageStore* /*store*/) {};
- };
+ virtual u_int64_t contentSize() const = 0;
+ virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
+ virtual bool isPersistent() = 0;
+ virtual const ConnectionToken* const getPublisher() = 0;
+
+ virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+ virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedHeaderSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ virtual u_int32_t encodedContentSize() = 0;
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ virtual u_int64_t expectedContentSize() = 0;
+
+ // TODO: AMS 29/1/2007 Don't think these are really part of base class
+
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
+ virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {};
+ virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {};
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ virtual void releaseContent(MessageStore* /*store*/) {};
+};
- }
-}
+}}
#endif /*!_broker_BrokerMessage_h*/
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Fri Feb 2 14:03:10 2007
@@ -22,29 +22,28 @@
using namespace qpid::broker;
-MessageMessage::MessageMessage(const qpid::framing::AMQMethodBody& _methodBody,
- const std::string& _exchange, const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- Message(_exchange, _routingKey, _mandatory, _immediate),
- methodBody(_methodBody)
+MessageMessage::MessageMessage(
+ const qpid::framing::AMQMethodBody::shared_ptr _methodBody,
+ const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
+ methodBody(_methodBody)
{
}
-void MessageMessage::deliver(qpid::framing::OutputHandler* /*out*/,
- int /*channel*/,
- const std::string& /*consumerTag*/,
- u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/,
- qpid::framing::ProtocolVersion* /*version*/)
+void MessageMessage::deliver(
+ framing::ChannelAdapter& /*out*/,
+ const std::string& /*consumerTag*/,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t /*framesize*/)
{
}
-void MessageMessage::sendGetOk(qpid::framing::OutputHandler* /*out*/,
- int /*channel*/,
- u_int32_t /*messageCount*/,
- u_int64_t /*deliveryTag*/,
- u_int32_t /*framesize*/,
- qpid::framing::ProtocolVersion* /*version*/)
+void MessageMessage::sendGetOk(
+ const framing::MethodContext& /*context*/,
+ u_int32_t /*messageCount*/,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t /*framesize*/)
{
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Fri Feb 2 14:03:10 2007
@@ -25,47 +25,46 @@
#include "BrokerMessageBase.h"
namespace qpid {
- namespace framing {
- class AMQMethodBody;
- }
+namespace framing {
+class AMQMethodBody;
+}
- namespace broker {
- class MessageMessage: public Message{
- const qpid::framing::AMQMethodBody& methodBody;
+namespace broker {
+class MessageMessage: public Message{
+ const qpid::framing::AMQMethodBody::shared_ptr methodBody;
- public:
- MessageMessage(const qpid::framing::AMQMethodBody& methodBody,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
+ public:
+ MessageMessage(
+ const framing::AMQMethodBody::shared_ptr methodBody,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
- // Default destructor okay
+ // Default destructor okay
- void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const std::string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
- void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
- bool isComplete();
+ void deliver(framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ void sendGetOk(const framing::MethodContext& context,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ bool isComplete();
- u_int64_t contentSize() const;
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- bool isPersistent();
- const ConnectionToken* const getPublisher();
+ u_int64_t contentSize() const;
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ const ConnectionToken* const getPublisher();
- u_int32_t encodedSize();
- u_int32_t encodedHeaderSize();
- u_int32_t encodedContentSize();
- u_int64_t expectedContentSize();
- };
+ u_int32_t encodedSize();
+ u_int32_t encodedHeaderSize();
+ u_int32_t encodedContentSize();
+ u_int64_t expectedContentSize();
+};
- }
+}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Fri Feb 2 14:03:10 2007
@@ -60,9 +60,11 @@
void Connection::received(qpid::framing::AMQFrame* frame){
- getAdapter(frame->getChannel()).handleBody(frame->getBody());
+ getChannel(frame->getChannel()).handleBody(frame->getBody());
}
+// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter
+// as it is part of the protocol.
void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
if (client.get())
// TODO aconway 2007-01-16: correct error code.
@@ -72,12 +74,11 @@
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- // TODO aconway 2007-01-16: Client call, move to adapter.
client->getConnection().start(
- MethodContext(0, &getAdapter(0)),
+ MethodContext(&getChannel(0)),
header->getMajor(), header->getMinor(),
properties, mechanisms, locales);
- getAdapter(0).init(0, *out, client->getProtocolVersion());
+ getChannel(0).init(0, *out, client->getProtocolVersion());
}
void Connection::idleOut(){}
@@ -99,28 +100,19 @@
void Connection::closeChannel(u_int16_t channel) {
getChannel(channel).close();
- adapters.erase(adapters.find(channel));
+ channels.erase(channels.find(channel));
}
-BrokerAdapter& Connection::getAdapter(u_int16_t id) {
- AdapterMap::iterator i = adapters.find(id);
- if (i == adapters.end()) {
- std::auto_ptr<Channel> ch(
- new Channel(
- client->getProtocolVersion(), out, id,
- framemax, broker.getQueues().getStore(),
- settings.stagingThreshold));
- BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
- adapters.insert(id, adapter);
- return *adapter;
- }
- else
- return *i;
-}
-
-Channel& Connection::getChannel(u_int16_t id) {
- return getAdapter(id).getChannel();
+Channel& Connection::getChannel(ChannelId id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i == channels.end()) {
+ i = channels.insert(
+ id, new Channel(
+ *this, id, framemax, broker.getQueues().getStore(),
+ settings.stagingThreshold)).first;
+ }
+ return *i;
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h Fri Feb 2 14:03:10 2007
@@ -34,7 +34,6 @@
#include <sys/TimeoutHandler.h>
#include "Broker.h"
#include "Exception.h"
-#include "BrokerAdapter.h"
namespace qpid {
namespace broker {
@@ -47,19 +46,22 @@
Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
};
-class Connection : public qpid::sys::ConnectionInputHandler,
+class Connection : public sys::ConnectionInputHandler,
public ConnectionToken
{
public:
- Connection(qpid::sys::ConnectionOutputHandler* out, Broker& broker);
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker);
// ConnectionInputHandler methods
- void received(qpid::framing::AMQFrame* frame);
- void initiated(qpid::framing::ProtocolInitiation* header);
+ void received(framing::AMQFrame* frame);
+ void initiated(framing::ProtocolInitiation* header);
void idleOut();
void idleIn();
void closed();
- qpid::sys::ConnectionOutputHandler& getOutput() { return *out; }
+ sys::ConnectionOutputHandler& getOutput() { return *out; }
+
+ const framing::ProtocolVersion& getVersion() {
+ return client->getProtocolVersion(); }
u_int32_t getFrameMax() const { return framemax; }
u_int16_t getHeartbeat() const { return heartbeat; }
@@ -68,7 +70,7 @@
void setHeartbeat(u_int16_t hb) { heartbeat = hb; }
Broker& broker;
- std::auto_ptr<qpid::framing::AMQP_ClientProxy> client;
+ std::auto_ptr<framing::AMQP_ClientProxy> client;
Settings settings;
std::vector<Queue::shared_ptr> exclusiveQueues;
@@ -81,20 +83,18 @@
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
- Channel& newChannel(u_int16_t channel);
- Channel& getChannel(u_int16_t channel);
- void closeChannel(u_int16_t channel);
+ Channel& newChannel(framing::ChannelId channel);
+ Channel& getChannel(framing::ChannelId channel);
+ void closeChannel(framing::ChannelId channel);
private:
- typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
+ typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
Exchange::shared_ptr findExchange(const string& name);
- BrokerAdapter& getAdapter(u_int16_t id);
-
- AdapterMap adapters;
- qpid::sys::ConnectionOutputHandler* out;
+ ChannelMap channels;
+ sys::ConnectionOutputHandler* out;
u_int32_t framemax;
u_int16_t heartbeat;
};
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h Fri Feb 2 14:03:10 2007
@@ -24,21 +24,25 @@
#include <AMQContentBody.h>
#include <Buffer.h>
#include <OutputHandler.h>
-#include <ProtocolVersion.h>
namespace qpid {
- namespace broker {
- class Content{
- public:
- virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
- virtual u_int32_t size() = 0;
- virtual void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
- virtual void encode(qpid::framing::Buffer& buffer) = 0;
- virtual void destroy() = 0;
- virtual ~Content(){}
- };
- }
+
+namespace framing {
+class ChannelAdapter;
}
+
+namespace broker {
+class Content{
+ public:
+ virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
+ virtual u_int32_t size() = 0;
+ virtual void send(framing::ChannelAdapter& channel,
+ u_int32_t framesize) = 0;
+ virtual void encode(qpid::framing::Buffer& buffer) = 0;
+ virtual void destroy() = 0;
+ virtual ~Content(){}
+};
+}}
#endif
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp Fri Feb 2 14:03:10 2007
@@ -20,6 +20,7 @@
*/
#include <InMemoryContent.h>
#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -39,24 +40,26 @@
return sum;
}
-void InMemoryContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+// FIXME aconway 2007-02-01: Remove version parameter.
+void InMemoryContent::send(ChannelAdapter& channel, u_int32_t framesize)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
if ((*i)->size() > framesize) {
u_int32_t offset = 0;
for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
string data = (*i)->getData().substr(offset, framesize);
- out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ channel.send(new AMQContentBody(data));
offset += framesize;
}
u_int32_t remainder = (*i)->size() % framesize;
if (remainder) {
string data = (*i)->getData().substr(offset, remainder);
- out->send(new AMQFrame(version, channel, new AMQContentBody(data)));
+ channel.send(new AMQContentBody(data));
}
} else {
- AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- out->send(new AMQFrame(version, channel, contentBody));
+ AMQBody::shared_ptr contentBody =
+ static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ channel.send(contentBody);
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h?view=diff&rev=502767&r1=502766&r2=502767
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h Fri Feb 2 14:03:10 2007
@@ -35,7 +35,7 @@
public:
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
- void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void send(framing::ChannelAdapter&, u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
void destroy();
~InMemoryContent(){}