You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/19 22:33:30 UTC
svn commit: r497963 - in /incubator/qpid/branches/qpid.0-9: cpp/
cpp/lib/broker/ cpp/lib/client/ cpp/lib/common/ cpp/lib/common/framing/
cpp/lib/common/sys/ cpp/lib/common/sys/apr/ cpp/lib/common/sys/posix/
cpp/tests/ gentools/src/org/apache/qpid/gento...
Author: aconway
Date: Fri Jan 19 13:33:27 2007
New Revision: 497963
URL: http://svn.apache.org/viewvc?view=rev&rev=497963
Log:
Last big refactoring for 0-9 framing. Still need additional tests &
debugging but the overall structure is all in place.
* configure.ac: Added -Wno_virtual_overload warning
* ChannelTest.cpp, MessageBuilderTest.cpp: Fixed virtual overload warnings.
* ChannelAdapter.cpp: Common base for client/broker adapters.
Creates invocation context, handles request/resposne IDs.
* CppGenerator.java:
- Proxies send methods using MethodContext.
* Various .h files: removed unnecessary #includes, added to requred .cpp files.
* ConnectionContext: renamed from SessionContext.
Added:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionOutputHandler.h
- copied, changed from r497431, incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/SessionContext.h
Removed:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/SessionContext.h
Modified:
incubator/qpid/branches/qpid.0-9/cpp/configure.ac
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/OutputHandler.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/LFSessionContext.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelConnection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/check.h
incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/MockConnectionInputHandler.h
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java
Modified: incubator/qpid/branches/qpid.0-9/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/configure.ac?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/configure.ac (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/configure.ac Fri Jan 19 13:33:27 2007
@@ -66,6 +66,7 @@
gl_COMPILER_FLAGS(-Wvolatile-register-var)
gl_COMPILER_FLAGS(-Winvalid-pch)
gl_COMPILER_FLAGS(-Wno-system-headers)
+ gl_COMPILER_FLAGS(-Woverloaded-virtual)
AC_SUBST([WARNING_CFLAGS], [$COMPILER_FLAGS])
AC_DEFINE([lint], 1, [Define to 1 if the compiler is checking for lint.])
COMPILER_FLAGS=
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp Fri Jan 19 13:33:27 2007
@@ -30,7 +30,6 @@
#include "NullMessageStore.h"
#include "ProtocolInitiation.h"
#include "Connection.h"
-#include "sys/SessionContext.h"
#include "sys/ConnectionInputHandler.h"
#include "sys/ConnectionInputHandlerFactory.h"
#include "sys/TimeoutHandler.h"
@@ -97,7 +96,9 @@
acceptor->shutdown();
}
-Broker::~Broker() { }
+Broker::~Broker() {
+ shutdown();
+}
const int16_t Broker::DEFAULT_PORT(5672);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Fri Jan 19 13:33:27 2007
@@ -181,9 +181,9 @@
};
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
- const MethodContext& , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
- connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat);
+ connection.client->getConnection().tune(context, 100, connection.framemax, connection.heartbeat);
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
@@ -193,40 +193,40 @@
connection.heartbeat = heartbeat;
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext&, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
- connection.client->getConnection().openOk(0, knownhosts);
+ connection.client->getConnection().openOk(context, knownhosts);
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
- const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getConnection().closeOk(0);
- connection.context->close();
+ connection.client->getConnection().closeOk(context);
+ connection.getOutput().close();
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){
- connection.context->close();
+ connection.getOutput().close();
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
- const MethodContext&, const string& /*outOfBand*/){
+ const MethodContext& context, const string& /*outOfBand*/){
// FIXME aconway 2007-01-17: Assertions on all channel methods,
assertChannelNonZero(channel.getId());
if (channel.isOpen())
throw ConnectionException(504, "Channel already open");
channel.open();
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
- connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */);
+ connection.client->getChannel().openOk(context, std::string()/* ID */);
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/,
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){
- connection.client->getChannel().closeOk(channel.getId());
+ connection.client->getChannel().closeOk(context);
// FIXME aconway 2007-01-18: Following line destroys this. Ugly.
connection.closeChannel(channel.getId());
}
@@ -235,7 +235,7 @@
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& exchange, const string& type,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& /*arguments*/){
@@ -258,19 +258,19 @@
}
}
if(!nowait){
- connection.client->getExchange().declareOk(channel.getId());
+ connection.client->getExchange().declareOk(context);
}
}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(channel.getId());
+ if(!nowait) connection.client->getExchange().deleteOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& name,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
@@ -301,11 +301,11 @@
}
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(channel.getId(), queueName, queue->getMessageCount(), queue->getConsumerCount());
+ connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_int16_t /*ticket*/, const string& queueName,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
@@ -314,7 +314,7 @@
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(channel.getId());
+ if(!nowait) connection.client->getQueue().bindOk(context);
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -341,14 +341,14 @@
connection.client->getQueue().unbindOk(channel.getId());
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count);
+ if(!nowait) connection.client->getQueue().purgeOk(context, count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, const string& queue,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
@@ -368,21 +368,21 @@
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(channel.getId(), count);
+ if(!nowait) connection.client->getQueue().deleteOk(context, count);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(channel.getId());
+ connection.client->getBasic().qosOk(context);
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
- const MethodContext&, u_int16_t /*ticket*/,
+ const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait, const FieldTable& fields)
@@ -398,7 +398,7 @@
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(channel.getId(), newTag);
+ if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -409,10 +409,10 @@
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext&, const string& consumerTag, bool nowait){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag);
+ if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/,
@@ -429,12 +429,12 @@
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(channel.getId(), clusterId);
+ connection.client->getBasic().getEmpty(context, clusterId);
}
}
@@ -452,20 +452,20 @@
channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
- connection.client->getTx().selectOk(channel.getId());
+ connection.client->getTx().selectOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
- connection.client->getTx().commitOk(channel.getId());
+ connection.client->getTx().commitOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
- connection.client->getTx().rollbackOk(channel.getId());
+ connection.client->getTx().rollbackOk(context);
channel.recover(false);
}
@@ -499,6 +499,7 @@
BrokerAdapter::BrokerAdapter(
Channel* ch, Connection& c, Broker& b
) :
+ ChannelAdapter(c.getOutput(), ch->getId()),
channel(ch),
connection(c),
broker(b),
@@ -507,24 +508,25 @@
assert(ch);
}
-void BrokerAdapter::handleMethod(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void BrokerAdapter::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
{
try{
- // FIXME aconway 2007-01-17: invoke to take Channel&?
- method->invoke(*serverOps, channel->getId());
+ method->invoke(*serverOps, context);
}catch(ChannelException& e){
- connection.closeChannel(channel->getId());
+ connection.closeChannel(getId());
connection.client->getChannel().close(
- channel->getId(), e.code, e.toString(),
+ context, e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
connection.client->getConnection().close(
- 0, e.code, e.toString(),
+ context, e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
connection.client->getConnection().close(
- 0, 541/*internal error*/, e.what(),
+ context, 541/*internal error*/, e.what(),
method->amqpClassId(), method->amqpMethodId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Fri Jan 19 13:33:27 2007
@@ -22,6 +22,7 @@
#include "AMQP_ServerOperations.h"
#include "BodyHandler.h"
#include "BrokerChannel.h"
+#include "amqp_types.h"
namespace qpid {
namespace broker {
@@ -40,19 +41,22 @@
*
* Owns a channel, has references to Connection and Broker.
*/
-class BrokerAdapter : public qpid::framing::BodyHandler
+class BrokerAdapter : public qpid::framing::ChannelAdapter
{
public:
// FIXME aconway 2007-01-18: takes ownership, should pass auto_ptr<Channel>
BrokerAdapter(Channel* ch, Connection&, Broker&);
Channel& getChannel() { return *channel; }
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
private:
+ void handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const framing::MethodContext& context);
+
class ServerOps;
std::auto_ptr<Channel> channel;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Fri Jan 19 13:33:27 2007
@@ -45,7 +45,9 @@
#include <OutputHandler.h>
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
+#include <AMQHeartbeatBody.h>
#include <BasicPublishBody.h>
+#include "ChannelAdapter.h"
namespace qpid {
namespace broker {
@@ -56,8 +58,7 @@
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : private MessageBuilder::CompletionHandler
-{
+class Channel : private MessageBuilder::CompletionHandler {
class ConsumerImpl : public virtual Consumer
{
Channel* parent;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Fri Jan 19 13:33:27 2007
@@ -26,6 +26,7 @@
#include <MessageStore.h>
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
+#include "AMQFrame.h"
using namespace boost;
using namespace qpid::broker;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Fri Jan 19 13:33:27 2007
@@ -31,12 +31,12 @@
namespace qpid {
namespace broker {
-Connection::Connection(SessionContext* context_, Broker& broker_) :
- context(context_),
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
framemax(65536),
heartbeat(0),
broker(broker_),
- settings(broker.getTimeout(), broker.getStagingThreshold())
+ settings(broker.getTimeout(), broker.getStagingThreshold()),
+ out(out_)
{}
Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
@@ -68,14 +68,15 @@
// TODO aconway 2007-01-16: correct error code.
throw ConnectionException(0, "Connection initiated twice");
client.reset(new qpid::framing::AMQP_ClientProxy(
- context, header->getMajor(), header->getMinor()));
+ out, header->getMajor(), header->getMinor()));
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- // TODO aconway 2007-01-16: Move to adapter.
+ // TODO aconway 2007-01-16: Client call, move to adapter.
client->getConnection().start(
- 0, header->getMajor(), header->getMinor(), properties,
- mechanisms, locales);
+ MethodContext(0, out),
+ header->getMajor(), header->getMinor(),
+ properties, mechanisms, locales);
}
void Connection::idleOut(){}
@@ -105,7 +106,7 @@
AdapterMap::iterator i = adapters.find(id);
if (i == adapters.end()) {
Channel* ch=new Channel(
- client->getProtocolVersion(), context, id,
+ client->getProtocolVersion(), out, id,
framemax, broker.getQueues().getStore(),
settings.stagingThreshold);
BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.h Fri Jan 19 13:33:27 2007
@@ -29,7 +29,7 @@
#include <AMQFrame.h>
#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
-#include <sys/SessionContext.h>
+#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include <sys/TimeoutHandler.h>
#include "Broker.h"
@@ -50,18 +50,8 @@
class Connection : public qpid::sys::ConnectionInputHandler,
public ConnectionToken
{
- typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
-
- // FIXME aconway 2007-01-16: on broker.
- typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- Exchange::shared_ptr findExchange(const string& name);
-
- BrokerAdapter& getAdapter(u_int16_t id);
-
- AdapterMap adapters;
-
public:
- Connection(qpid::sys::SessionContext* context, Broker& broker);
+ Connection(qpid::sys::ConnectionOutputHandler* out, Broker& broker);
// ConnectionInputHandler methods
void received(qpid::framing::AMQFrame* frame);
void initiated(qpid::framing::ProtocolInitiation* header);
@@ -69,8 +59,9 @@
void idleIn();
void closed();
+ qpid::sys::ConnectionOutputHandler& getOutput() { return *out; }
+
// FIXME aconway 2007-01-16: encapsulate.
- qpid::sys::SessionContext* context;
u_int32_t framemax;
u_int16_t heartbeat;
Broker& broker;
@@ -91,6 +82,19 @@
Channel& newChannel(u_int16_t channel);
Channel& getChannel(u_int16_t channel);
void closeChannel(u_int16_t channel);
+
+ private:
+ typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
+
+ // FIXME aconway 2007-01-16: on broker.
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+ Exchange::shared_ptr findExchange(const string& name);
+
+ BrokerAdapter& getAdapter(u_int16_t id);
+
+ AdapterMap adapters;
+ qpid::sys::ConnectionOutputHandler* out;
+
};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.cpp Fri Jan 19 13:33:27 2007
@@ -35,9 +35,9 @@
}
qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::SessionContext* ctxt)
+ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out)
{
- return new Connection(ctxt, broker);
+ return new Connection(out, broker);
}
}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ConnectionFactory.h Fri Jan 19 13:33:27 2007
@@ -32,7 +32,7 @@
public:
ConnectionFactory(Broker& b);
- virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext* ctxt);
+ virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::ConnectionOutputHandler* ctxt);
virtual ~ConnectionFactory();
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp Fri Jan 19 13:33:27 2007
@@ -19,6 +19,7 @@
*
*/
#include <InMemoryContent.h>
+#include "AMQFrame.h"
using namespace qpid::broker;
using namespace qpid::framing;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp Fri Jan 19 13:33:27 2007
@@ -19,6 +19,7 @@
*
*/
#include <LazyLoadedContent.h>
+#include "AMQFrame.h"
using namespace qpid::broker;
using namespace qpid::framing;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Fri Jan 19 13:33:27 2007
@@ -64,7 +64,7 @@
}
void
-MessageHandlerImpl::consume(const MethodContext&,
+MessageHandlerImpl::consume(const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
const string& destination,
@@ -85,7 +85,7 @@
string newTag = destination;
channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
- connection.client->getMessageHandler()->ok(channel.getId());
+ connection.client->getMessageHandler()->ok(context);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -102,7 +102,7 @@
}
void
-MessageHandlerImpl::get( const MethodContext&,
+MessageHandlerImpl::get( const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
const string& /*destination*/,
@@ -110,12 +110,12 @@
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue =
+ connection.getQueue(queueName, context.channelId);
// FIXME: get is probably Basic specific
- if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
-
- connection.client->getMessageHandler()->empty(channel.getId());
+ if(!channel.get(queue, !noAck)){
+ connection.client->getMessageHandler()->empty(context);
}
}
@@ -141,7 +141,7 @@
}
void
-MessageHandlerImpl::qos(const MethodContext&,
+MessageHandlerImpl::qos(const MethodContext& context,
u_int32_t prefetchSize,
u_int16_t prefetchCount,
bool /*global*/ )
@@ -152,7 +152,7 @@
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getMessageHandler()->ok(channel.getId());
+ connection.client->getMessageHandler()->ok(context);
}
void
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h Fri Jan 19 13:33:27 2007
@@ -37,62 +37,62 @@
MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
: channel(ch), connection(c), broker(b) {}
- void append(const qpid::framing::MethodContext&,
+ void append(const framing::MethodContext&,
const std::string& reference,
const std::string& bytes );
- void cancel(const qpid::framing::MethodContext&,
+ void cancel(const framing::MethodContext&,
const std::string& destination );
- void checkpoint(const qpid::framing::MethodContext&,
+ void checkpoint(const framing::MethodContext&,
const std::string& reference,
const std::string& identifier );
- void close(const qpid::framing::MethodContext&,
+ void close(const framing::MethodContext&,
const std::string& reference );
- void consume(const qpid::framing::MethodContext&,
+ void consume(const framing::MethodContext&,
u_int16_t ticket,
const std::string& queue,
const std::string& destination,
bool noLocal,
bool noAck,
bool exclusive,
- const qpid::framing::FieldTable& filter );
+ const framing::FieldTable& filter );
- void empty( const qpid::framing::MethodContext& );
+ void empty( const framing::MethodContext& );
- void get(const qpid::framing::MethodContext&,
+ void get(const framing::MethodContext&,
u_int16_t ticket,
const std::string& queue,
const std::string& destination,
bool noAck );
- void offset(const qpid::framing::MethodContext&,
+ void offset(const framing::MethodContext&,
u_int64_t value );
- void ok( const qpid::framing::MethodContext& );
+ void ok( const framing::MethodContext& );
- void open(const qpid::framing::MethodContext&,
+ void open(const framing::MethodContext&,
const std::string& reference );
- void qos(const qpid::framing::MethodContext&,
+ void qos(const framing::MethodContext&,
u_int32_t prefetchSize,
u_int16_t prefetchCount,
bool global );
- void recover(const qpid::framing::MethodContext&,
+ void recover(const framing::MethodContext&,
bool requeue );
- void reject(const qpid::framing::MethodContext&,
+ void reject(const framing::MethodContext&,
u_int16_t code,
const std::string& text );
- void resume(const qpid::framing::MethodContext&,
+ void resume(const framing::MethodContext&,
const std::string& reference,
const std::string& identifier );
- void transfer(const qpid::framing::MethodContext&,
+ void transfer(const framing::MethodContext&,
u_int16_t ticket,
const std::string& destination,
bool redelivered,
@@ -113,8 +113,8 @@
const std::string& appId,
const std::string& transactionId,
const std::string& securityToken,
- const qpid::framing::FieldTable& applicationHeaders,
- qpid::framing::Content body );
+ const framing::FieldTable& applicationHeaders,
+ framing::Content body );
};
}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Fri Jan 19 13:33:27 2007
@@ -256,6 +256,16 @@
sendAndReceive(frame, method_bodies.tx_rollback_ok);
}
+void Channel::handleRequest(AMQRequestBody::shared_ptr body) {
+ // FIXME aconway 2007-01-19: request/response handling.
+ handleMethod(body);
+}
+
+void Channel::handleResponse(AMQResponseBody::shared_ptr body) {
+ // FIXME aconway 2007-01-19: request/response handling.
+ handleMethod(body);
+}
+
void Channel::handleMethod(AMQMethodBody::shared_ptr body){
//channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
if(responses.isWaiting()){
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Fri Jan 19 13:33:27 2007
@@ -67,7 +67,9 @@
*
* \ingroup clientapi
*/
- class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{
+ class Channel : private virtual framing::BodyHandler,
+ public virtual sys::Runnable
+ {
struct Consumer{
MessageListener* listener;
int ackMode;
@@ -78,36 +80,38 @@
u_int16_t id;
Connection* con;
- qpid::sys::Thread dispatcher;
- qpid::framing::OutputHandler* out;
+ sys::Thread dispatcher;
+ framing::OutputHandler* out;
IncomingMessage* incoming;
ResponseHandler responses;
std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
IncomingMessage* retrieved;//holds response to basic.get
- qpid::sys::Monitor dispatchMonitor;
- qpid::sys::Monitor retrievalMonitor;
+ sys::Monitor dispatchMonitor;
+ sys::Monitor retrievalMonitor;
std::map<std::string, Consumer*> consumers;
ReturnedMessageHandler* returnsHandler;
bool closed;
u_int16_t prefetch;
const bool transactional;
- qpid::framing::ProtocolVersion version;
+ framing::ProtocolVersion version;
void enqueue();
void retrieve(Message& msg);
IncomingMessage* dequeue();
void dispatch();
void stop();
- void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+ void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
void deliver(Consumer* consumer, Message& msg);
void setQos();
void cancelAll();
- virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+ virtual void handleMethod(framing::AMQMethodBody::shared_ptr body);
+ virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body);
+ virtual void handleContent(framing::AMQContentBody::shared_ptr body);
+ virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
+ void handleRequest(framing::AMQRequestBody::shared_ptr);
+ void handleResponse(framing::AMQResponseBody::shared_ptr);
public:
/**
@@ -185,7 +189,7 @@
* is received from the broker
*/
void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
- const qpid::framing::FieldTable& args, bool synch = true);
+ const framing::FieldTable& args, bool synch = true);
/**
* Creates a 'consumer' for a queue. Messages in (or arriving
* at) that queue will be delivered to consumers
@@ -216,7 +220,7 @@
void consume(
Queue& queue, std::string& tag, MessageListener* listener,
int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const qpid::framing::FieldTable* fields = 0);
+ const framing::FieldTable* fields = 0);
/**
* Cancels a subscription previously set up through a call to consume().
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Fri Jan 19 13:33:27 2007
@@ -184,6 +184,16 @@
}
}
+void Connection::handleRequest(AMQRequestBody::shared_ptr body) {
+ // FIXME aconway 2007-01-19: request/response handling.
+ handleMethod(body);
+}
+
+void Connection::handleResponse(AMQResponseBody::shared_ptr body) {
+ // FIXME aconway 2007-01-19: request/response handling.
+ handleMethod(body);
+}
+
void Connection::handleMethod(AMQMethodBody::shared_ptr body){
//connection.close, basic.deliver, basic.return or a response to a synchronous request
if(responses.isWaiting()){
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h Fri Jan 19 13:33:27 2007
@@ -66,7 +66,8 @@
class Connection : public virtual qpid::framing::InputHandler,
public virtual qpid::sys::TimeoutHandler,
public virtual qpid::sys::ShutdownHandler,
- private virtual qpid::framing::BodyHandler{
+ private virtual qpid::framing::BodyHandler
+ {
typedef std::map<int, Channel*>::iterator iterator;
@@ -80,20 +81,25 @@
qpid::framing::OutputHandler* out;
ResponseHandler responses;
volatile bool closed;
- qpid::framing::ProtocolVersion version;
- qpid::framing::Requester requester;
- qpid::framing::Responder responder;
+ framing::ProtocolVersion version;
+ framing::Requester requester;
+ framing::Responder responder;
- void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
+ void channelException(Channel* channel, framing::AMQMethodBody* body, QpidError& e);
void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
- void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+ void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
- virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
- void handleFrame(qpid::framing::AMQFrame* frame);
+ // FIXME aconway 2007-01-19: Use channel(0) not connection
+ // to handle channel 0 requests. Remove handler methods.
+ //
+ void handleRequest(framing::AMQRequestBody::shared_ptr);
+ void handleResponse(framing::AMQResponseBody::shared_ptr);
+ void handleMethod(framing::AMQMethodBody::shared_ptr);
+ void handleHeader(framing::AMQHeaderBody::shared_ptr);
+ void handleContent(framing::AMQContentBody::shared_ptr);
+ void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr);
+ void handleFrame(framing::AMQFrame* frame);
public:
/**
@@ -110,7 +116,7 @@
* client will accept. Optional and defaults to 65536.
*/
Connection( bool debug = false, u_int32_t max_frame_size = 65536,
- qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion));
+ framing::ProtocolVersion* _version = &(framing::highestProtocolVersion));
~Connection();
/**
@@ -163,7 +169,7 @@
*/
void removeChannel(Channel* channel);
- virtual void received(qpid::framing::AMQFrame* frame);
+ virtual void received(framing::AMQFrame* frame);
virtual void idleOut();
virtual void idleIn();
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am Fri Jan 19 13:33:27 2007
@@ -65,6 +65,7 @@
$(framing)/AMQMethodBody.cpp \
$(framing)/BasicHeaderProperties.cpp \
$(framing)/BodyHandler.cpp \
+ $(framing)/ChannelAdapter.cpp \
$(framing)/Buffer.cpp \
$(framing)/FieldTable.cpp \
$(framing)/FramingContent.cpp \
@@ -96,6 +97,7 @@
$(framing)/AMQMethodBody.h \
$(framing)/BasicHeaderProperties.h \
$(framing)/BodyHandler.h \
+ $(framing)/ChannelAdapter.h \
$(framing)/Buffer.h \
$(framing)/FieldTable.h \
$(framing)/FramingContent.h \
@@ -119,7 +121,7 @@
sys/Monitor.h \
sys/Mutex.h \
sys/Runnable.h \
- sys/SessionContext.h \
+ sys/ConnectionOutputHandler.h \
sys/ConnectionInputHandler.h \
sys/ConnectionInputHandlerFactory.h \
sys/ShutdownHandler.h \
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.cpp Fri Jan 19 13:33:27 2007
@@ -28,15 +28,15 @@
AMQP_MethodVersionMap AMQFrame::versionMap;
-AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version):
+AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version):
version(_version)
{}
-AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) :
+AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) :
version(_version), channel(_channel), body(_body)
{}
-AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) :
+AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) :
version(_version), channel(_channel), body(_body)
{}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h Fri Jan 19 13:33:27 2007
@@ -41,9 +41,9 @@
class AMQFrame : virtual public AMQDataBlock
{
public:
- AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
- AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body);
- AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body);
+ AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
+ AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body);
+ AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body);
virtual ~AMQFrame();
virtual void encode(Buffer& buffer);
virtual bool decode(Buffer& buffer);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp Fri Jan 19 13:33:27 2007
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <AMQFrame.h>
#include <AMQMethodBody.h>
#include <QpidError.h>
#include "AMQP_MethodVersionMap.h"
@@ -59,5 +60,8 @@
decodeContent(buffer);
}
+void AMQMethodBody::send(const MethodContext& context) {
+ context.out->send(new AMQFrame(version, context.channelId, this));
+}
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h Fri Jan 19 13:33:27 2007
@@ -53,6 +53,13 @@
virtual void invoke(AMQP_ServerOperations&, const MethodContext&);
bool match(AMQMethodBody* other) const;
+
+ /**
+ * Wrap this method in a frame and send using the current context.
+ * Note the frame takes ownership of the body, it will be deleted.
+ */
+ virtual void send(const MethodContext& context);
+
protected:
static u_int32_t baseSize() { return 4; }
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp Fri Jan 19 13:33:27 2007
@@ -16,6 +16,7 @@
*
*/
+#include "AMQFrame.h"
#include "AMQResponseBody.h"
#include "AMQP_MethodVersionMap.h"
@@ -61,5 +62,11 @@
<< ",batch=" << data.batchOffset << "): ";
}
+void AMQResponseBody::send(const MethodContext& context) {
+ setRequestId(context.requestId);
+ assert(context.out);
+ context.out->send(
+ new AMQFrame(version, context.channelId, this));
+}
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h Fri Jan 19 13:33:27 2007
@@ -65,6 +65,11 @@
ResponseId getResponseId() { return data.responseId; }
RequestId getRequestId() { return data.requestId; }
BatchOffset getBatchOffset() { return data.batchOffset; }
+ void setResponseId(ResponseId id) { data.responseId = id; }
+ void setRequestId(RequestId id) { data.requestId = id; }
+ void setBatchOffset(BatchOffset id) { data.batchOffset = id; }
+
+ virtual void send(const MethodContext& context);
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.cpp Fri Jan 19 13:33:27 2007
@@ -58,22 +58,3 @@
}
}
-void BodyHandler::handleRequest(AMQRequestBody::shared_ptr request) {
- responder.received(request->getData());
- handleMethod(request);
-}
-
-void BodyHandler::handleResponse(AMQResponseBody::shared_ptr response) {
- handleMethod(response);
- requester.processed(response->getData());
-}
-
-void BodyHandler::assertChannelZero(u_int16_t id) {
- if (id != 0)
- throw ConnectionException(504, "Invalid channel id, not 0");
-}
-
-void BodyHandler::assertChannelNonZero(u_int16_t id) {
- if (id == 0)
- throw ConnectionException(504, "Invalid channel id 0");
-}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/BodyHandler.h Fri Jan 19 13:33:27 2007
@@ -38,35 +38,21 @@
class AMQHeartbeatBody;
/**
- * Base class for client and broker channel handlers.
- *
- * Handles request/response id management common to client and broker.
- * Derived classes provide remaining client/broker specific handling.
+ * Interface to handle incoming frame bodies.
+ * Derived classes provide logic for each frame type.
*/
class BodyHandler {
public:
virtual ~BodyHandler();
-
- void handleBody(boost::shared_ptr<AMQBody> body);
+ virtual void handleBody(boost::shared_ptr<AMQBody> body);
protected:
- virtual void handleRequest(boost::shared_ptr<AMQRequestBody>);
- virtual void handleResponse(boost::shared_ptr<AMQResponseBody>);
-
+ virtual void handleRequest(boost::shared_ptr<AMQRequestBody>) = 0;
+ virtual void handleResponse(boost::shared_ptr<AMQResponseBody>) = 0;
virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0;
virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0;
virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0;
virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0;
-
- protected:
- /** Throw protocol exception if this is not channel 0. */
- static void assertChannelZero(u_int16_t id);
- /** Throw protocol exception if this is channel 0. */
- static void assertChannelNonZero(u_int16_t id);
-
- private:
- Requester requester;
- Responder responder;
};
}}
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp?view=auto&rev=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp Fri Jan 19 13:33:27 2007
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ChannelAdapter.h"
+#include "AMQFrame.h"
+
+namespace qpid {
+namespace framing {
+
+void ChannelAdapter::send(AMQFrame* frame) {
+ AMQBody::shared_ptr body = frame->getBody();
+ switch (body->type()) {
+ case REQUEST_BODY: {
+ AMQRequestBody::shared_ptr request =
+ boost::shared_polymorphic_downcast<AMQRequestBody>(body);
+ requester.sending(request->getData());
+ break;
+ }
+ case RESPONSE_BODY: {
+ AMQResponseBody::shared_ptr response =
+ boost::shared_polymorphic_downcast<AMQResponseBody>(body);
+ responder.sending(response->getData());
+ break;
+ }
+ }
+ out.send(frame);
+}
+
+void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
+ responder.received(request->getData());
+ MethodContext context(id, &out, request->getRequestId());
+ handleMethodInContext(request, context);
+}
+
+void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
+ handleMethod(response);
+ requester.processed(response->getData());
+}
+
+void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
+ MethodContext context(id, this);
+ handleMethodInContext(method, context);
+}
+
+void ChannelAdapter::assertChannelZero(u_int16_t id) {
+ if (id != 0)
+ throw ConnectionException(504, "Invalid channel id, not 0");
+}
+
+void ChannelAdapter::assertChannelNonZero(u_int16_t id) {
+ if (id == 0)
+ throw ConnectionException(504, "Invalid channel id 0");
+}
+
+}} // namespace qpid::framing
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=auto&rev=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h Fri Jan 19 13:33:27 2007
@@ -0,0 +1,90 @@
+#ifndef _ChannelAdapter_
+#define _ChannelAdapter_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+#include "BodyHandler.h"
+#include "Requester.h"
+#include "Responder.h"
+#include "OutputHandler.h"
+
+namespace qpid {
+namespace framing {
+
+class MethodContext;
+
+/**
+ * Base class for client and broker channel adapters.
+ *
+ * As BodyHandler:
+ * - Creates MethodContext and dispatches methods+context to derived class.
+ * - Updates request/response ID data.
+ *
+ * As OutputHandler:
+ * - Updates request/resposne ID data.
+ *
+ */
+class ChannelAdapter : public BodyHandler, public OutputHandler {
+ public:
+ /**
+ *@param output Processed frames are forwarded to this handler.
+ */
+ ChannelAdapter(OutputHandler& output, ChannelId channelId)
+ : id(channelId), out(output) {}
+
+ ChannelId getId() { return id; }
+
+ /**
+ * Do request/response-id processing and then forward to
+ * handler provided to constructor. Response frames should
+ * have their request-id set before calling send.
+ */
+ void send(AMQFrame* frame);
+
+ void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
+ void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
+ void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>);
+
+ protected:
+ /** Throw protocol exception if this is not channel 0. */
+ static void assertChannelZero(u_int16_t id);
+ /** Throw protocol exception if this is channel 0. */
+ static void assertChannelNonZero(u_int16_t id);
+
+ virtual void handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context) = 0;
+
+ ChannelId id;
+
+ private:
+ Requester requester;
+ Responder responder;
+ OutputHandler& out;
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h Fri Jan 19 13:33:27 2007
@@ -19,6 +19,9 @@
*
*/
+#include "OutputHandler.h"
+#include "ProtocolVersion.h"
+
namespace qpid {
namespace framing {
@@ -26,11 +29,14 @@
/**
* Invocation context for an AMQP method.
+ * Some of the context information is related to the channel, some
+ * to the specific invocation - e.g. requestId.
+ *
* All generated proxy and handler functions take a MethodContext parameter.
*
- * The user calling on a broker proxy can simply pass an integer
- * channel ID, it will implicitly be converted to an appropriate context.
- *
+ * The user does not need to create MethodContext objects explicitly,
+ * the constructor will implicitly create one from a channel ID.
+ *
* Other context members are for internal use.
*/
struct MethodContext
@@ -39,13 +45,21 @@
* Passing a integer channel-id in place of a MethodContext
* will automatically construct the MethodContext.
*/
- MethodContext(ChannelId channel, RequestId request=0)
- : channelId(channel), requestId(request) {}
+ MethodContext(
+ ChannelId channel, OutputHandler* output=0, RequestId request=0)
+ : channelId(channel), out(output), requestId(request){}
+
+ /** \internal Channel on which the method is sent. */
+ const ChannelId channelId;
+
+ /** Output handler for responses in this context */
+ OutputHandler* out;
+
+ /** \internal If we are in the context of processing an incoming request,
+ * this is the ID. Otherwise it is 0.
+ */
+ const RequestId requestId;
- /** Channel on which the method is sent. */
- ChannelId channelId;
- /** \internal For proxy response: the original request or 0. */
- RequestId requestId;
};
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/OutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/OutputHandler.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/OutputHandler.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/OutputHandler.h Fri Jan 19 13:33:27 2007
@@ -22,10 +22,10 @@
*
*/
#include <boost/noncopyable.hpp>
-#include <AMQFrame.h>
namespace qpid {
namespace framing {
+class AMQFrame;
class OutputHandler : private boost::noncopyable {
public:
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionInputHandlerFactory.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionInputHandlerFactory.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionInputHandlerFactory.h Fri Jan 19 13:33:27 2007
@@ -26,7 +26,7 @@
namespace qpid {
namespace sys {
-class SessionContext;
+class ConnectionOutputHandler;
class ConnectionInputHandler;
/**
@@ -36,7 +36,7 @@
class ConnectionInputHandlerFactory : private boost::noncopyable
{
public:
- virtual ConnectionInputHandler* create(SessionContext* ctxt) = 0;
+ virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0;
virtual ~ConnectionInputHandlerFactory(){}
};
Copied: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionOutputHandler.h (from r497431, incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/SessionContext.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionOutputHandler.h?view=diff&rev=497963&p1=incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/SessionContext.h&r1=497431&p2=incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionOutputHandler.h&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/SessionContext.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ConnectionOutputHandler.h Fri Jan 19 13:33:27 2007
@@ -18,8 +18,8 @@
* under the License.
*
*/
-#ifndef _SessionContext_
-#define _SessionContext_
+#ifndef _ConnectionOutputHandler_
+#define _ConnectionOutputHandler_
#include <OutputHandler.h>
@@ -29,7 +29,7 @@
/**
* Provides the output handler associated with a connection.
*/
-class SessionContext : public virtual qpid::framing::OutputHandler
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler
{
public:
virtual void close() = 0;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/LFSessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/LFSessionContext.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/LFSessionContext.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/LFSessionContext.h Fri Jan 19 13:33:27 2007
@@ -30,7 +30,7 @@
#include <AMQFrame.h>
#include <Buffer.h>
#include <sys/Monitor.h>
-#include <sys/SessionContext.h>
+#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include "APRSocket.h"
@@ -40,7 +40,7 @@
namespace sys {
-class LFSessionContext : public virtual qpid::sys::SessionContext
+class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler
{
const bool debug;
APRSocket socket;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp Fri Jan 19 13:33:27 2007
@@ -26,7 +26,7 @@
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
-#include <sys/SessionContext.h>
+#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include <sys/ConnectionInputHandlerFactory.h>
#include <sys/Acceptor.h>
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelConnection.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelConnection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/EventChannelConnection.h Fri Jan 19 13:33:27 2007
@@ -23,7 +23,7 @@
#include "EventChannelThreads.h"
#include "sys/Monitor.h"
-#include "sys/SessionContext.h"
+#include "sys/ConnectionOutputHandler.h"
#include "sys/ConnectionInputHandler.h"
#include "sys/AtomicCount.h"
#include "framing/AMQFrame.h"
@@ -34,13 +34,13 @@
class ConnectionInputHandlerFactory;
/**
- * Implements SessionContext and delegates to a ConnectionInputHandler
+ * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler
* for a connection via the EventChannel.
*@param readDescriptor file descriptor for reading.
*@param writeDescriptor file descriptor for writing,
* by default same as readDescriptor
*/
-class EventChannelConnection : public SessionContext {
+class EventChannelConnection : public ConnectionOutputHandler {
public:
EventChannelConnection(
EventChannelThreads::shared_ptr threads,
@@ -50,7 +50,7 @@
bool isTrace = false
);
- // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
+ // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr
virtual void send(qpid::framing::AMQFrame* frame) {
send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/check.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/check.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/check.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/posix/check.h Fri Jan 19 13:33:27 2007
@@ -45,7 +45,7 @@
Exception* clone() const throw() { return new PosixError(*this); }
- void throwSelf() { throw *this; }
+ void throwSelf() const { throw *this; }
private:
int errNo;
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp Fri Jan 19 13:33:27 2007
@@ -27,7 +27,7 @@
#include <iostream>
#include <memory>
#include <AMQP_HighestVersion.h>
-
+#include "AMQFrame.h"
using namespace boost;
using namespace qpid::broker;
@@ -107,6 +107,9 @@
handle(call);
}
+ // Don't hide overloads.
+ using NullMessageStore::destroy;
+
void destroy(Message* msg)
{
MethodCall call = {"destroy", msg, ""};
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp Fri Jan 19 13:33:27 2007
@@ -23,6 +23,7 @@
#include <AMQP_HighestVersion.h>
#include <iostream>
#include <list>
+#include "AMQFrame.h"
using std::list;
using std::string;
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp Fri Jan 19 13:33:27 2007
@@ -25,6 +25,7 @@
#include <iostream>
#include <list>
#include <sstream>
+#include "AMQFrame.h"
using std::list;
using std::string;
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp Fri Jan 19 13:33:27 2007
@@ -71,6 +71,9 @@
}
}
+ // Don't hide overloads.
+ using NullMessageStore::destroy;
+
void destroy(Message* msg)
{
CPPUNIT_ASSERT(msg->getPersistenceId());
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp Fri Jan 19 13:33:27 2007
@@ -22,6 +22,7 @@
#include <qpid_test_plugin.h>
#include <iostream>
#include <AMQP_HighestVersion.h>
+#include "AMQFrame.h"
using namespace boost;
using namespace qpid::broker;
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MockConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MockConnectionInputHandler.h?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MockConnectionInputHandler.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MockConnectionInputHandler.h Fri Jan 19 13:33:27 2007
@@ -89,7 +89,7 @@
struct MockConnectionInputHandlerFactory : public qpid::sys::ConnectionInputHandlerFactory {
MockConnectionInputHandlerFactory() : handler(0) {}
- qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext*) {
+ qpid::sys::ConnectionInputHandler* create(qpid::sys::ConnectionOutputHandler*) {
qpid::sys::Monitor::ScopedLock lock(monitor);
handler = new MockConnectionInputHandler();
monitor.notifyAll();
Modified: incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java?view=diff&rev=497963&r1=497962&r2=497963
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java (original)
+++ incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java Fri Jan 19 13:33:27 2007
@@ -811,7 +811,7 @@
for (String thisClassName : model.classMap.keySet())
{
AmqpClass thisClass = model.classMap.get(thisClassName);
- sb.append(indent + "virtual inline " + outerClassName + "::" + thisClass.name + "Handler* get" +
+ sb.append(indent + "virtual " + outerClassName + "::" + thisClass.name + "Handler* get" +
thisClass.name + "Handler() { return &" + Utils.firstLower(thisClass.name) + ";}" + cr);
}
return sb.toString();
@@ -1064,10 +1064,10 @@
String indent = Utils.createSpaces(indentSize);
String tab = Utils.createSpaces(tabSize);
String namespace = version != null ? version.namespace() + "::" : "";
- StringBuffer sb = new StringBuffer(indent + "out->send( new AMQFrame( parent->getProtocolVersion(), context.channelId," + cr);
- sb.append(indent + tab + "new " + namespace + methodBodyClassName + "( parent->getProtocolVersion()");
+ StringBuffer sb = new StringBuffer();
+ sb.append(indent + tab + "(new " + namespace + methodBodyClassName + "( parent->getProtocolVersion()");
sb.append(generateMethodParameterList(fieldMap, indentSize + (5*tabSize), true, false, true));
- sb.append(" )));" + cr);
+ sb.append("))->send(context);\n");
return sb.toString();
}
@@ -1145,7 +1145,7 @@
for (Integer thisOrdinal : ordinalFieldMap.keySet())
{
String[] fieldDomainPair = ordinalFieldMap.get(thisOrdinal);
- sb.append(indent + "inline " + setRef(fieldDomainPair[FIELD_CODE_TYPE]) + " get" +
+ sb.append(indent + "" + setRef(fieldDomainPair[FIELD_CODE_TYPE]) + " get" +
Utils.firstUpper(fieldDomainPair[FIELD_NAME]) + "() { return " +
fieldDomainPair[FIELD_NAME] + "; }" + cr);
}
@@ -1451,7 +1451,7 @@
if (bItr.next()) // This is a server operation
{
boolean fieldMapNotEmptyFlag = method.fieldMap.size() > 0;
- sb.append(indent + "inline void invoke(AMQP_ServerOperations& target, const MethodContext& context)" + cr);
+ sb.append(indent + "void invoke(AMQP_ServerOperations& target, const MethodContext& context)" + cr);
sb.append(indent + "{" + cr);
sb.append(indent + tab + "target.get" + thisClass.name + "Handler()->" +
parseForReservedWords(Utils.firstLower(method.name),