You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/09/18 21:43:34 UTC
svn commit: r577027 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/framing/
Author: aconway
Date: Tue Sep 18 12:43:29 2007
New Revision: 577027
URL: http://svn.apache.org/viewvc?rev=577027&view=rev
Log:
Refactor HandlerImpl to use Session rather than CoreRefs.
Remove most uses of ChannelAdapter in broker code.
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Sep 18 12:43:29 2007
@@ -247,7 +247,6 @@
qpid/broker/DtxWorkRecord.h \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
- qpid/broker/HandlerImpl.h \
qpid/broker/Message.h \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Tue Sep 18 12:43:29 2007
@@ -38,42 +38,35 @@
// by the handlers responsible for those classes.
//
-BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) :
- CoreRefs(s,
- s.getAdapter()->getConnection(),
- s.getAdapter()->getConnection().broker,
- a),
- basicHandler(*this),
- exchangeHandler(*this),
- bindingHandler(*this),
- messageHandler(*this),
- queueHandler(*this),
- txHandler(*this),
- dtxHandler(*this)
+BrokerAdapter::BrokerAdapter(Session& s) :
+ HandlerImpl(s),
+ basicHandler(s),
+ exchangeHandler(s),
+ bindingHandler(s),
+ messageHandler(s),
+ queueHandler(s),
+ txHandler(s),
+ dtxHandler(s)
{}
-ProtocolVersion BrokerAdapter::getVersion() const {
- return connection.getVersion();
-}
-
void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
- alternate = broker.getExchanges().get(alternateExchange);
+ alternate = getBroker().getExchanges().get(alternateExchange);
}
if(passive){
- Exchange::shared_ptr actual(broker.getExchanges().get(exchange));
+ Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
checkAlternate(actual, alternate);
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
+ std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
if (response.second) {
if (durable) {
- broker.getStore().create(*response.first);
+ getBroker().getStore().create(*response.first);
}
if (alternate) {
response.first->setAlternate(alternate);
@@ -109,17 +102,17 @@
void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
//TODO: implement unused
- Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
- if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+ if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- broker.getExchanges().destroy(name);
+ getBroker().getExchanges().destroy(name);
}
ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
- Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
return ExchangeQueryResult("", false, true, FieldTable());
@@ -134,12 +127,12 @@
{
Exchange::shared_ptr exchange;
try {
- exchange = broker.getExchanges().get(exchangeName);
+ exchange = getBroker().getExchanges().get(exchangeName);
} catch (const ChannelException&) {}
Queue::shared_ptr queue;
if (!queueName.empty()) {
- queue = broker.getQueues().find(queueName);
+ queue = getBroker().getQueues().find(queueName);
}
if (!exchange) {
@@ -160,7 +153,7 @@
QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = session.getQueue(name);
+ Queue::shared_ptr queue = getSession().getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return QueueQueryResult(queue->getName(),
@@ -179,22 +172,22 @@
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
- alternate = broker.getExchanges().get(alternateExchange);
+ alternate = getBroker().getExchanges().get(alternateExchange);
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = session.getQueue(name);
+ queue = getSession().getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- broker.getQueues().declare(
+ getBroker().getQueues().declare(
name, durable,
autoDelete && !exclusive,
- exclusive ? &connection : 0);
+ exclusive ? &getConnection() : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- session.setDefaultQueue(queue);
+ getSession().setDefaultQueue(queue);
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -204,16 +197,16 @@
queue_created.first->create(arguments);
//add default binding:
- broker.getExchanges().getDefault()->bind(queue, name, 0);
- queue->bound(broker.getExchanges().getDefault()->getName(), name, arguments);
+ getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+ queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
//handle automatic cleanup:
if (exclusive) {
- connection.exclusiveQueues.push_back(queue);
+ getConnection().exclusiveQueues.push_back(queue);
}
}
}
- if (exclusive && !queue->isExclusiveOwner(&connection))
+ if (exclusive && !queue->isExclusiveOwner(&getConnection()))
throw ResourceLockedException(
QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
@@ -223,14 +216,14 @@
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = session.getQueue(queueName);
- Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
queue->bound(exchangeName, routingKey, arguments);
if (exchange->isDurable() && queue->isDurable()) {
- broker.getStore().bind(*exchange, *queue, routingKey, arguments);
+ getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
}else{
@@ -246,38 +239,38 @@
const string& routingKey,
const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = session.getQueue(queueName);
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
- Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
- broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
+ getBroker().getStore().unbind(*exchange, *queue, routingKey, arguments);
}
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- session.getQueue(queue)->purge();
+ getSession().getQueue(queue)->purge();
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = session.getQueue(queue);
+ Queue::shared_ptr q = getSession().getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
throw PreconditionFailedException("Queue in use.");
}else{
//remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(&connection)){
- QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
- if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
+ if(q->isExclusiveOwner(&getConnection())){
+ QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
+ if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
}
q->destroy();
- broker.getQueues().destroy(queue);
- q->unbind(broker.getExchanges(), q);
+ getBroker().getQueues().destroy(queue);
+ q->unbind(getBroker().getExchanges(), q);
}
}
@@ -286,8 +279,8 @@
void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
- session.setPrefetchSize(prefetchSize);
- session.setPrefetchCount(prefetchCount);
+ getSession().setPrefetchSize(prefetchSize);
+ getSession().setPrefetchCount(prefetchCount);
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -296,8 +289,8 @@
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = session.getQueue(queueName);
- if(!consumerTag.empty() && session.exists(consumerTag)){
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
+ if(!consumerTag.empty() && getSession().exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
string newTag = consumerTag;
@@ -305,33 +298,34 @@
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
+ getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
- if(!nowait) client.consumeOk(newTag);
+ if(!nowait)
+ getProxy().getBasic().consumeOk(newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->requestDispatch();
}
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- session.cancel(consumerTag);
+ getSession().cancel(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = session.getQueue(queueName);
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!session.get(token, queue, !noAck)){
+ if(!getSession().get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- client.getEmpty(clusterId);
+ getProxy().getBasic().getEmpty(clusterId);
}
}
void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
if (multiple) {
- session.ackCumulative(deliveryTag);
+ getSession().ackCumulative(deliveryTag);
} else {
- session.ackRange(deliveryTag, deliveryTag);
+ getSession().ackRange(deliveryTag, deliveryTag);
}
}
@@ -339,23 +333,23 @@
void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
{
- session.recover(requeue);
+ getSession().recover(requeue);
}
void BrokerAdapter::TxHandlerImpl::select()
{
- session.startTx();
+ getSession().startTx();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
- session.commit(&broker.getStore());
+ getSession().commit(&getBroker().getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
- session.rollback();
- session.recover(false);
+ getSession().rollback();
+ getSession().recover(false);
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Tue Sep 18 12:43:29 2007
@@ -19,7 +19,6 @@
*
*/
#include "DtxHandlerImpl.h"
-#include "HandlerImpl.h"
#include "MessageHandlerImpl.h"
#include "NameGenerator.h"
#include "qpid/Exception.h"
@@ -55,18 +54,28 @@
* peer.
*
*/
-class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
+
+// TODO aconway 2007-09-18: BrokerAdapter is no longer an appropriate way
+// to group methods as seen by the BADHANDLERs below.
+// Handlers should be grouped by layer, the BrokerAdapter stuff
+// belongs on the SemanticHandler.
+//
+class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(Session& session, framing::ChannelAdapter& a);
+ BrokerAdapter(Session& session);
- framing::ProtocolVersion getVersion() const;
BasicHandler* getBasicHandler() { return &basicHandler; }
ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
BindingHandler* getBindingHandler() { return &bindingHandler; }
QueueHandler* getQueueHandler() { return &queueHandler; }
TxHandler* getTxHandler() { return &txHandler; }
MessageHandler* getMessageHandler() { return &messageHandler; }
+ DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
+ DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
+ framing::ProtocolVersion getVersion() const { return getConnection().getVersion(); }
+
+
AccessHandler* getAccessHandler() {
throw framing::NotImplementedException("Access class not implemented"); }
FileHandler* getFileHandler() {
@@ -75,26 +84,22 @@
throw framing::NotImplementedException("Stream class not implemented"); }
TunnelHandler* getTunnelHandler() {
throw framing::NotImplementedException("Tunnel class not implemented"); }
- DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
- DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
- ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); }
// Handlers no longer implemented in BrokerAdapter:
#define BADHANDLER() assert(0); throw framing::InternalErrorException()
+ ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
SessionHandler* getSessionHandler() { BADHANDLER(); }
ChannelHandler* getChannelHandler() { BADHANDLER(); }
#undef BADHANDLER
- framing::AMQP_ClientProxy& getProxy() { return proxy; }
-
private:
class ExchangeHandlerImpl :
public ExchangeHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
+ public HandlerImpl
{
public:
- ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+ ExchangeHandlerImpl(Session& session) : HandlerImpl(session) {}
void declare(uint16_t ticket,
const std::string& exchange, const std::string& type,
@@ -111,10 +116,10 @@
class BindingHandlerImpl :
public BindingHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Binding>
+ public HandlerImpl
{
public:
- BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+ BindingHandlerImpl(Session& session) : HandlerImpl(session) {}
framing::BindingQueryResult query(u_int16_t ticket,
const std::string& exchange,
@@ -125,10 +130,10 @@
class QueueHandlerImpl :
public QueueHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Queue>
+ public HandlerImpl
{
public:
- QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+ QueueHandlerImpl(Session& session) : HandlerImpl(session) {}
void declare(uint16_t ticket, const std::string& queue,
const std::string& alternateExchange,
@@ -151,12 +156,12 @@
class BasicHandlerImpl :
public BasicHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Basic>
+ public HandlerImpl
{
NameGenerator tagGenerator;
public:
- BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {}
+ BasicHandlerImpl(Session& session) : HandlerImpl(session), tagGenerator("sgen") {}
void qos(uint32_t prefetchSize,
uint16_t prefetchCount, bool global);
@@ -173,10 +178,10 @@
class TxHandlerImpl :
public TxHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Tx>
+ public HandlerImpl
{
public:
- TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+ TxHandlerImpl(Session& session) : HandlerImpl(session) {}
void select();
void commit();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Sep 18 12:43:29 2007
@@ -68,6 +68,7 @@
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
+ Broker& getBroker() { return broker; }
Broker& broker;
std::vector<Queue::shared_ptr> exclusiveQueues;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Sep 18 12:43:29 2007
@@ -21,6 +21,7 @@
#include "DeliveryRecord.h"
#include "DeliverableMessage.h"
#include "Session.h"
+#include "BrokerExchange.h"
#include "qpid/log/Statement.h"
using namespace qpid::broker;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Tue Sep 18 12:43:29 2007
@@ -26,14 +26,14 @@
using namespace qpid::framing;
using std::string;
-DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
+DtxHandlerImpl::DtxHandlerImpl(Session& s) : HandlerImpl(s) {}
// DtxDemarcationHandler:
void DtxHandlerImpl::select()
{
- session.selectDtx();
+ getSession().selectDtx();
}
DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -43,7 +43,7 @@
{
try {
if (fail) {
- session.endDtx(xid, true);
+ getSession().endDtx(xid, true);
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
@@ -51,9 +51,9 @@
}
} else {
if (suspend) {
- session.suspendDtx(xid);
+ getSession().suspendDtx(xid);
} else {
- session.endDtx(xid, false);
+ getSession().endDtx(xid, false);
}
return DtxDemarcationEndResult(XA_OK);
}
@@ -72,9 +72,9 @@
}
try {
if (resume) {
- session.resumeDtx(xid);
+ getSession().resumeDtx(xid);
} else {
- session.startDtx(xid, broker.getDtxManager(), join);
+ getSession().startDtx(xid, getBroker().getDtxManager(), join);
}
return DtxDemarcationStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
@@ -88,7 +88,7 @@
const string& xid)
{
try {
- bool ok = broker.getDtxManager().prepare(xid);
+ bool ok = getBroker().getDtxManager().prepare(xid);
return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
return DtxCoordinationPrepareResult(XA_RBTIMEOUT);
@@ -100,7 +100,7 @@
bool onePhase)
{
try {
- bool ok = broker.getDtxManager().commit(xid, onePhase);
+ bool ok = getBroker().getDtxManager().commit(xid, onePhase);
return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
return DtxCoordinationCommitResult(XA_RBTIMEOUT);
@@ -112,7 +112,7 @@
const string& xid )
{
try {
- broker.getDtxManager().rollback(xid);
+ getBroker().getDtxManager().rollback(xid);
return DtxCoordinationRollbackResult(XA_OK);
} catch (const DtxTimeoutException& e) {
return DtxCoordinationRollbackResult(XA_RBTIMEOUT);
@@ -136,7 +136,7 @@
// note that this restricts the length of the xids more than is
// strictly 'legal', but that is ok for testing
std::set<std::string> xids;
- broker.getStore().collectPreparedXids(xids);
+ getBroker().getStore().collectPreparedXids(xids);
uint size(0);
for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
size += i->size() + 1/*shortstr size*/;
@@ -167,7 +167,7 @@
DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
{
- uint32_t timeout = broker.getDtxManager().getTimeout(xid);
+ uint32_t timeout = getBroker().getDtxManager().getTimeout(xid);
return DtxCoordinationGetTimeoutResult(timeout);
}
@@ -176,7 +176,7 @@
const string& xid,
u_int32_t timeout)
{
- broker.getDtxManager().setTimeout(xid, timeout);
+ getBroker().getDtxManager().setTimeout(xid, timeout);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h Tue Sep 18 12:43:29 2007
@@ -27,12 +27,12 @@
namespace broker {
class DtxHandlerImpl
- : public CoreRefs,
+ : public HandlerImpl,
public framing::AMQP_ServerOperations::DtxCoordinationHandler,
public framing::AMQP_ServerOperations::DtxDemarcationHandler
{
public:
- DtxHandlerImpl(CoreRefs& parent);
+ DtxHandlerImpl(Session&);
// DtxCoordinationHandler:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Tue Sep 18 12:43:29 2007
@@ -19,49 +19,47 @@
*
*/
-#include "Broker.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/ChannelAdapter.h"
+#include "Session.h"
+#include "SessionHandler.h"
+#include "Connection.h"
namespace qpid {
namespace broker {
-class Connection;
-class Session;
+class Broker;
/**
- * A collection of references to the core objects required by an adapter,
- * and a client proxy.
+ * Base template for protocol handler implementations.
+ * Provides convenience methods for getting common session objects.
*/
-struct CoreRefs
-{
- CoreRefs(Session& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
- : session(ch), connection(c), broker(b), adapter(a), proxy(a.getHandlers().out) {}
+class HandlerImpl {
+ protected:
+ HandlerImpl(Session& s) : session(s) {}
+
+ Session& getSession() { return session; }
+ const Session& getSession() const { return session; }
+
+ SessionHandler* getSessionHandler() { return session.getHandler(); }
+ const SessionHandler* getSessionHandler() const { return session.getHandler(); }
+
+ // Remaining functions may only be called if getSessionHandler() != 0
+ framing::AMQP_ClientProxy& getProxy() { return getSessionHandler()->getProxy(); }
+ const framing::AMQP_ClientProxy& getProxy() const { return getSessionHandler()->getProxy(); }
+
+ Connection& getConnection() { return getSessionHandler()->getConnection(); }
+ const Connection& getConnection() const { return getSessionHandler()->getConnection(); }
+
+ Broker& getBroker() { return getConnection().broker; }
+ const Broker& getBroker() const { return getConnection().broker; }
+ private:
Session& session;
- Connection& connection;
- Broker& broker;
- framing::ChannelAdapter& adapter;
- framing::AMQP_ClientProxy proxy;
};
-
-/**
- * Base template for protocol handler implementations.
- * Provides the core references and appropriate AMQP class proxy.
- */
-template <class ProxyType>
-struct HandlerImpl : public CoreRefs {
- typedef HandlerImpl<ProxyType> HandlerImplType;
- HandlerImpl(CoreRefs& parent)
- : CoreRefs(parent), client(ProxyType::get(proxy)) {}
- ProxyType client;
-};
-
-
-
}} // namespace qpid::broker
#endif /*!_broker_HandlerImpl_h*/
+
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Sep 18 12:43:29 2007
@@ -139,7 +139,7 @@
frames.remove(TypeFilter(CONTENT_BODY));
}
-void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize)
+void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize)
{
if (isContentReleased()) {
//load content from store in chunks of maxContentSize
@@ -148,7 +148,7 @@
for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
{
uint64_t remaining = expectedSize - offset;
- AMQFrame frame(channel, AMQContentBody());
+ AMQFrame frame(0, AMQContentBody());
string& data = frame.castBody<AMQContentBody>()->getData();
store->loadContent(*this, data, offset,
@@ -168,15 +168,14 @@
Count c;
frames.map_if(c, TypeFilter(CONTENT_BODY));
- SendContent f(out, channel, maxFrameSize, c.getCount());
+ SendContent f(out, maxFrameSize, c.getCount());
frames.map_if(f, TypeFilter(CONTENT_BODY));
}
}
-void Message::sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t /*maxFrameSize*/)
+void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
{
- Relay f(out, channel);
- frames.map_if(f, TypeFilter(HEADER_BODY));
+ frames.map_if(out, TypeFilter(HEADER_BODY));
}
MessageAdapter& Message::getAdapter() const
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Sep 18 12:43:29 2007
@@ -114,8 +114,8 @@
*/
void releaseContent(MessageStore* store);
- void sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize);
- void sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize);
+ void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize);
+ void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize);
bool isContentLoaded() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp Tue Sep 18 12:43:29 2007
@@ -23,7 +23,7 @@
#include "DeliveryToken.h"
#include "Message.h"
#include "BrokerQueue.h"
-#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/BasicGetOkBody.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -114,7 +114,7 @@
}
void MessageDelivery::deliver(Message::shared_ptr msg,
- framing::ChannelAdapter& channel,
+ framing::FrameHandler& handler,
DeliveryId id,
DeliveryToken::shared_ptr token,
uint16_t framesize)
@@ -123,15 +123,10 @@
//another may well have the wrong headers; however we will only
//have one content class for 0-10 proper
- FrameHandler& handler = channel.getHandlers().out;
-
- //send method
boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
AMQFrame method = t->sendMethod(msg, id);
method.setEof(false);
- method.setChannel(channel.getId());
handler.handle(method);
-
- msg->sendHeader(handler, channel.getId(), framesize);
- msg->sendContent(handler, channel.getId(), framesize);
+ msg->sendHeader(handler, framesize);
+ msg->sendContent(handler, framesize);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h Tue Sep 18 12:43:29 2007
@@ -23,15 +23,9 @@
*/
#include <boost/shared_ptr.hpp>
#include "DeliveryId.h"
+#include "qpid/framing/FrameHandler.h"
namespace qpid {
-
-namespace framing {
-
-class ChannelAdapter;
-
-}
-
namespace broker {
class DeliveryToken;
@@ -49,7 +43,7 @@
u_int8_t confirmMode,
u_int8_t acquireMode);
- static void deliver(boost::shared_ptr<Message> msg, framing::ChannelAdapter& channel,
+ static void deliver(boost::shared_ptr<Message> msg, framing::FrameHandler& out,
DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Tue Sep 18 12:43:29 2007
@@ -36,8 +36,8 @@
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
- : HandlerImplType(parent) {}
+MessageHandlerImpl::MessageHandlerImpl(Session& session)
+ : HandlerImpl(session) {}
//
// Message class method handlers
@@ -46,7 +46,7 @@
void
MessageHandlerImpl::cancel(const string& destination )
{
- session.cancel(destination);
+ getSession().cancel(destination);
}
void
@@ -97,14 +97,14 @@
bool exclusive,
const framing::FieldTable& filter )
{
- Queue::shared_ptr queue = session.getQueue(queueName);
- if(!destination.empty() && session.exists(destination))
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
+ if(!destination.empty() && getSession().exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
//NB: am assuming pre-acquired = 0 as discussed on SIG list as is
//the previously expected behaviour
- session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
@@ -117,9 +117,9 @@
const string& destination,
bool noAck )
{
- Queue::shared_ptr queue = session.getQueue(queueName);
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
- if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+ if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -148,14 +148,14 @@
bool /*global*/ )
{
//TODO: handle global
- session.setPrefetchSize(prefetchSize);
- session.setPrefetchCount(prefetchCount);
+ getSession().setPrefetchSize(prefetchSize);
+ getSession().setPrefetchCount(prefetchCount);
}
void
MessageHandlerImpl::recover(bool requeue)
{
- session.recover(requeue);
+ getSession().recover(requeue);
}
void
@@ -166,7 +166,7 @@
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- session.reject(i->getValue(), (++i)->getValue());
+ getSession().reject(i->getValue(), (++i)->getValue());
}
}
@@ -175,10 +175,10 @@
if (unit == 0) {
//message
- session.addMessageCredit(destination, value);
+ getSession().addMessageCredit(destination, value);
} else if (unit == 1) {
//bytes
- session.addByteCredit(destination, value);
+ getSession().addByteCredit(destination, value);
} else {
//unknown
throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -190,10 +190,10 @@
{
if (mode == 0) {
//credit
- session.setCreditMode(destination);
+ getSession().setCreditMode(destination);
} else if (mode == 1) {
//window
- session.setWindowMode(destination);
+ getSession().setWindowMode(destination);
} else{
throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
}
@@ -201,12 +201,12 @@
void MessageHandlerImpl::flush(const std::string& destination)
{
- session.flush(destination);
+ getSession().flush(destination);
}
void MessageHandlerImpl::stop(const std::string& destination)
{
- session.stop(destination);
+ getSession().stop(destination);
}
void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
@@ -218,11 +218,11 @@
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- session.acquire(i->getValue(), (++i)->getValue(), results);
+ getSession().acquire(i->getValue(), (++i)->getValue(), results);
}
results = results.condense();
- client.acquired(results);
+ getProxy().getMessage().acquired(results);
}
void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
@@ -232,7 +232,7 @@
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- session.release(i->getValue(), (++i)->getValue());
+ getSession().release(i->getValue(), (++i)->getValue());
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h Tue Sep 18 12:43:29 2007
@@ -34,10 +34,10 @@
class MessageHandlerImpl :
public framing::AMQP_ServerOperations::MessageHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Message>
+ public HandlerImpl
{
public:
- MessageHandlerImpl(CoreRefs& parent);
+ MessageHandlerImpl(Session&);
void append(const std::string& reference, const std::string& bytes);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Tue Sep 18 12:43:29 2007
@@ -37,13 +37,7 @@
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(Session& s) :
- session(s),
- connection(s.getAdapter()->getConnection()),
- adapter(s, static_cast<ChannelAdapter&>(*this))
-{
- init(s.getAdapter()->getChannel(), s.out, 0);
-}
+SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {}
void SemanticHandler::handle(framing::AMQFrame& frame)
{
@@ -86,24 +80,24 @@
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- session.ackCumulative(mark.getValue());
+ getSession().ackCumulative(mark.getValue());
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
- session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
void SemanticHandler::sendCompletion()
{
- if (isOpen()) {
+ if (getSessionHandler()) {
SequenceNumber mark = incoming.getMark();
SequenceNumberSet range = incoming.getRange();
Mutex::ScopedLock l(outLock);
- ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range));
+ getProxy().getExecution().complete(mark.getValue(), range);
}
}
void SemanticHandler::flush()
@@ -129,7 +123,8 @@
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
- SequenceNumber id = incoming.next();
+ SequenceNumber id = incoming.next();
+ BrokerAdapter adapter(getSession());
InvocationVisitor v(&adapter);
method->accept(v);
incoming.complete(id);
@@ -137,7 +132,7 @@
if (!v.wasHandled()) {
throw ConnectionException(540, "Not implemented");
} else if (v.hasResult()) {
- ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult()));
+ getProxy().getExecution().result(id.getValue(), v.getResult());
}
//TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
//TODO: if window gets too large send unsolicited completion
@@ -159,45 +154,24 @@
}
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
- msg->setPublisher(&connection);
- session.handle(msg);
+ msg->setPublisher(&getConnection());
+ getSession().handle(msg);
msgBuilder.end();
incoming.track(msg);
//TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
}
}
-bool SemanticHandler::isOpen() const {
- // FIXME aconway 2007-08-30: remove.
- return true;
-}
-
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
+ MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax());
return outgoing.hwm;
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax());
-}
-
-void SemanticHandler::send(const AMQBody& body)
-{
- Mutex::ScopedLock l(outLock);
- // FIXME aconway 2007-08-31: SessionHandler should not send
- // channel/session commands via the semantic handler, it should shortcut
- // directly to its own output handler. That will make the CLASS_ID
- // part of the test unnecessary.
- //
- if (body.getMethod() &&
- body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID)
- {
- ++outgoing.hwm;
- }
- ChannelAdapter::send(body);
+ MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax());
}
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
@@ -225,11 +199,3 @@
throw Exception("Could not determine track");
}
-//ChannelAdapter virtual methods, no longer used:
-void SemanticHandler::handleMethod(framing::AMQMethodBody*){}
-
-void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {}
-
-void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {}
-
-void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Tue Sep 18 12:43:29 2007
@@ -26,12 +26,12 @@
#include "DeliveryAdapter.h"
#include "MessageBuilder.h"
#include "IncomingExecutionContext.h"
+#include "HandlerImpl.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/ChannelAdapter.h"
namespace qpid {
@@ -49,11 +49,8 @@
class SemanticHandler : public DeliveryAdapter,
public framing::FrameHandler,
public framing::AMQP_ServerOperations::ExecutionHandler,
- private framing::ChannelAdapter
+ private HandlerImpl
{
- Session& session;
- Connection& connection;
- BrokerAdapter adapter;
IncomingExecutionContext incoming;
framing::Window outgoing;
sys::Mutex outLock;
@@ -68,17 +65,6 @@
void sendCompletion();
- //ChannelAdapter virtual methods:
- void handleMethod(framing::AMQMethodBody* method);
-
- bool isOpen() const;
- void handleHeader(framing::AMQHeaderBody*);
- void handleContent(framing::AMQContentBody*);
- void handleHeartbeat(framing::AMQHeartbeatBody*);
-
- void send(const framing::AMQBody& body);
-
-
//delivery adapter methods:
DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
@@ -88,9 +74,6 @@
//frame handler:
void handle(framing::AMQFrame& frame);
-
- // FIXME aconway 2007-08-31: Move proxy to Session.
- framing::AMQP_ClientProxy& getProxy() { return adapter.getProxy(); }
//execution class method handlers:
void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Tue Sep 18 12:43:29 2007
@@ -68,11 +68,9 @@
flowActive(true)
{
outstanding.reset();
- // FIXME aconway 2007-08-29: handler to get Session, not connection.
std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
+ // FIXME aconway 2007-08-29: move deliveryAdapter to SemanticHandlerState.
deliveryAdapter=semantic.get();
- // FIXME aconway 2007-08-31: Remove, workaround.
- semanticHandler=semantic.get();
handlers.push_back(semantic.release());
in = &handlers[0];
out = &adapter->out;
@@ -256,7 +254,7 @@
bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (nolocal && &parent->getAdapter()->getConnection() == msg.payload->getPublisher()) {
+ if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) {
return false;
} else {
if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
@@ -306,7 +304,7 @@
if(queue) {
queue->cancel(this);
if (queue->canAutoDelete()) {
- parent->getAdapter()->getConnection().broker.getQueues().destroyIf(queue->getName(),
+ parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
}
}
@@ -333,7 +331,7 @@
void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName){
- cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
+ cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName);
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Tue Sep 18 12:43:29 2007
@@ -32,7 +32,6 @@
#include "NameGenerator.h"
#include "Prefetch.h"
#include "TxBuffer.h"
-#include "SemanticHandler.h" // FIXME aconway 2007-08-31: remove
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AccumulatedAck.h"
#include "qpid/shared_ptr.h"
@@ -43,11 +42,6 @@
#include <vector>
namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
namespace broker {
class SessionHandler;
@@ -129,20 +123,15 @@
ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void acknowledged(const DeliveryRecord&);
-
- // FIXME aconway 2007-08-31: remove, temporary hack.
- SemanticHandler* semanticHandler;
-
AckRange findRange(DeliveryId first, DeliveryId last);
-
public:
Session(SessionHandler&, uint32_t timeout);
~Session();
/** Returns 0 if this session is not currently attached */
- SessionHandler* getAdapter() { return adapter; }
- const SessionHandler* getAdapter() const { return adapter; }
+ SessionHandler* getHandler() { return adapter; }
+ const SessionHandler* getHandler() const { return adapter; }
Broker& getBroker() const { return broker; }
@@ -198,13 +187,7 @@
void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
-
void handle(Message::shared_ptr msg);
-
- framing::AMQP_ClientProxy& getProxy() {
- // FIXME aconway 2007-08-31: Move proxy to Session.
- return semanticHandler->getProxy();
- }
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Sep 18 12:43:29 2007
@@ -29,18 +29,10 @@
namespace broker {
using namespace framing;
-// FIXME aconway 2007-08-31: the SessionHandler should create its
-// private proxy directly on the connections out handler.
-// Session/channel methods should not go thru the other layers.
-// Need to get rid of ChannelAdapter and allow proxies to be created
-// directly on output handlers.
-//
-framing::AMQP_ClientProxy& SessionHandler::getProxy() {
- return session->getProxy();
-}
-
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : InOutHandler(0, &c.getOutput()), connection(c), channel(ch), ignoring(false), channelHandler(*this) {}
+ : InOutHandler(0, &c.getOutput()),
+ connection(c), channel(ch), proxy(out),
+ ignoring(false), channelHandler(*this) {}
SessionHandler::~SessionHandler() {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Sep 18 12:43:29 2007
@@ -24,14 +24,10 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
namespace broker {
class Connection;
@@ -51,12 +47,17 @@
~SessionHandler();
/** Returns 0 if not attached to a session */
- Session* getSession() const { return session.get(); }
+ Session* getSession() { return session.get(); }
+ const Session* getSession() const { return session.get(); }
framing::ChannelId getChannel() const { return channel; }
+
Connection& getConnection() { return connection; }
const Connection& getConnection() const { return connection; }
+ framing::AMQP_ClientProxy& getProxy() { return proxy; }
+ const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
@@ -84,10 +85,9 @@
void assertOpen(const char* method);
void assertClosed(const char* method);
- framing::AMQP_ClientProxy& getProxy();
-
Connection& connection;
const framing::ChannelId channel;
+ framing::AMQP_ClientProxy proxy;
shared_ptr<Session> session;
bool ignoring;
ChannelMethods channelHandler;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp Tue Sep 18 12:43:29 2007
@@ -21,7 +21,7 @@
#include "SendContent.h"
-qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs, uint efc) : handler(h), channel(c),
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t mfs, uint efc) : handler(h),
maxFrameSize(mfs),
expectedFrameCount(efc), frameCount(0) {}
@@ -45,14 +45,13 @@
} else {
AMQFrame copy(f);
setFlags(copy, first, last);
- copy.setChannel(channel);
handler.handle(copy);
}
}
void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const
{
- AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+ AMQFrame fragment(0, AMQContentBody(body.getData().substr(offset, size)));
setFlags(fragment, first, last);
handler.handle(fragment);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h Tue Sep 18 12:43:29 2007
@@ -37,7 +37,6 @@
class SendContent
{
mutable FrameHandler& handler;
- const uint16_t channel;
const uint16_t maxFrameSize;
uint expectedFrameCount;
uint frameCount;
@@ -45,7 +44,7 @@
void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const;
void setFlags(AMQFrame& f, bool first, bool last) const;
public:
- SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize, uint frameCount);
+ SendContent(FrameHandler& _handler, uint16_t _maxFrameSize, uint frameCount);
void operator()(const AMQFrame& f);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Tue Sep 18 12:43:29 2007
@@ -82,22 +82,6 @@
void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); }
};
-class Relay
-{
- FrameHandler& handler;
- const uint16_t channel;
-
-public:
- Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {}
-
- void operator()(AMQFrame& f)
- {
- AMQFrame copy(f);
- copy.setChannel(channel);
- handler.handle(copy);
- }
-};
-
class Print
{
std::ostream& out;