You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/08/31 22:51:23 UTC
svn commit: r571575 [1/2] - in /incubator/qpid/trunk/qpid/cpp:
rubygen/templates/ src/ src/qpid/broker/ src/qpid/cluster/
src/qpid/framing/ src/tests/
Author: aconway
Date: Fri Aug 31 13:51:22 2007
New Revision: 571575
URL: http://svn.apache.org/viewvc?rev=571575&view=rev
Log:
* Summary:
- Moved BrokerChannel functionality into Session.
- Moved ChannelHandler methods handling into SessionAdapter.
- Updated all handlers to use session.
(We're still using AMQP channel methods in SessionAdapter)
Roles & responsibilities:
Session:
- represents an _open_ session, may be active or suspended.
- ows all session state including handler chains.
- attahced to SessionAdapter when active, not when suspended.
SessionAdapter:
- reprents the association of a channel with a session.
- owned by Connection, kept in the session map.
- channel open == SessionAdapter.getSessio() != 0
Anything that depends on attachment to a channel, connection or
protocol should be in SessionAdpater. Anything that suvives a
session suspend belongs in Session.
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb
incubator/qpid/trunk/qpid/cpp/src/ (props changed)
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb Fri Aug 31 13:51:22 2007
@@ -13,14 +13,13 @@
def generate()
h_file("#{@dir}/constants") {
namespace(@namespace) {
- @amqp.constants.each { |c|
- genl "enum { #{c.name.shout} = #{c.value} };"
+ scope("enum AmqpConstant {","};") {
+ genl @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" }.join(",\n")
}
}
}
h_file("#{@dir}/reply_exceptions") {
- include "constants"
include "qpid/Exception"
namespace(@namespace) {
@amqp.constants.each { |c|
@@ -35,6 +34,7 @@
}
}
}
+
end
end
Propchange: incubator/qpid/trunk/qpid/cpp/src/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Aug 31 13:51:22 2007
@@ -13,3 +13,4 @@
generate.timestamp
rubygen.timestamp
generate_MethodHolderMaxSize_h
+.Tpo
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Aug 31 13:51:22 2007
@@ -159,7 +159,6 @@
qpid/broker/Broker.cpp \
qpid/broker/BrokerAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
- qpid/broker/BrokerChannel.cpp \
qpid/broker/BrokerExchange.cpp \
qpid/broker/BrokerQueue.cpp \
qpid/broker/Connection.cpp \
@@ -227,7 +226,6 @@
nobase_include_HEADERS = \
$(platform_hdr) \
qpid/broker/AccumulatedAck.h \
- qpid/broker/BrokerChannel.h \
qpid/broker/BrokerExchange.h \
qpid/broker/BrokerQueue.h \
qpid/broker/Consumer.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Aug 31 13:51:22 2007
@@ -3,24 +3,24 @@
#
lib_LTLIBRARIES += libqpidcluster.la
-if CLUSTER
+# if CLUSTER
-libqpidcluster_la_SOURCES = \
- qpid/cluster/Cluster.cpp \
- qpid/cluster/Cluster.h \
- qpid/cluster/Cpg.cpp \
- qpid/cluster/Cpg.h \
- qpid/cluster/Dispatchable.h \
- qpid/cluster/ClusterPlugin.cpp \
- qpid/cluster/ClassifierHandler.h \
- qpid/cluster/ClassifierHandler.cpp \
- qpid/cluster/SessionManager.h \
- qpid/cluster/SessionManager.cpp
+# libqpidcluster_la_SOURCES = \
+# qpid/cluster/Cluster.cpp \
+# qpid/cluster/Cluster.h \
+# qpid/cluster/Cpg.cpp \
+# qpid/cluster/Cpg.h \
+# qpid/cluster/Dispatchable.h \
+# qpid/cluster/ClusterPlugin.cpp \
+# qpid/cluster/ClassifierHandler.h \
+# qpid/cluster/ClassifierHandler.cpp \
+# qpid/cluster/SessionManager.h \
+# qpid/cluster/SessionManager.cpp
-libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
+# libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
-else
+# else
# Empty stub library to satisfy rpm spec file.
libqpidcluster_la_SOURCES =
-endif
+#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Fri Aug 31 13:51:22 2007
@@ -15,10 +15,9 @@
* limitations under the License.
*
*/
-#include <boost/format.hpp>
-
#include "BrokerAdapter.h"
-#include "BrokerChannel.h"
+#include "Session.h"
+#include "SessionAdapter.h"
#include "Connection.h"
#include "DeliveryToken.h"
#include "MessageDelivery.h"
@@ -28,18 +27,23 @@
namespace qpid {
namespace broker {
-using boost::format;
using namespace qpid;
using namespace qpid::framing;
typedef std::vector<Queue::shared_ptr> QueueVector;
-
- BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) :
- CoreRefs(ch, c, b, a),
- connection(c),
+// FIXME aconway 2007-08-31: now that functionality is distributed
+// between different handlers, BrokerAdapter should be dropped.
+// Instead the individual class Handler interfaces can be implemented
+// by the handlers responsible for those classes.
+//
+
+BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) :
+ CoreRefs(s,
+ s.getAdapter()->getConnection(),
+ s.getAdapter()->getConnection().broker,
+ a),
basicHandler(*this),
- channelHandler(*this),
exchangeHandler(*this),
bindingHandler(*this),
messageHandler(*this),
@@ -52,31 +56,6 @@
ProtocolVersion BrokerAdapter::getVersion() const {
return connection.getVersion();
}
-
-void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
- channel.open();
- client.openOk();
-}
-
-void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
- channel.flow(active);
- client.flowOk(active);
-}
-
-void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){}
-
-void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/,
- const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk();
- // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
- connection.closeChannel(channel.getId());
-}
-
-void BrokerAdapter::ChannelHandlerImpl::closeOk(){}
-
-
void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
const string& alternateExchange,
@@ -148,10 +127,10 @@
}
BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
- const std::string& exchangeName,
- const std::string& queueName,
- const std::string& key,
- const framing::FieldTable& args)
+ const std::string& exchangeName,
+ const std::string& queueName,
+ const std::string& key,
+ const framing::FieldTable& args)
{
Exchange::shared_ptr exchange;
try {
@@ -181,7 +160,7 @@
QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = getQueue(name);
+ Queue::shared_ptr queue = session.getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return QueueQueryResult(queue->getName(),
@@ -205,7 +184,7 @@
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getQueue(name);
+ queue = session.getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
@@ -216,7 +195,7 @@
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- channel.setDefaultQueue(queue);
+ session.setDefaultQueue(queue);
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -236,17 +215,16 @@
}
}
if (exclusive && !queue->isExclusiveOwner(&connection))
- throw ChannelException(
- 405,
- format("Cannot grant exclusive access to queue '%s'")
- % queue->getName());
+ throw ResourceLockedException(
+ QPID_MSG("Cannot grant exclusive access to queue "
+ << queue->getName()));
}
void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = getQueue(queueName);
+ Queue::shared_ptr queue = session.getQueue(queueName);
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -257,23 +235,23 @@
}
}
}else{
- throw ChannelException(
- 404, "Bind failed. No such exchange: " + exchangeName);
+ throw NotFoundException(
+ "Bind failed. No such exchange: " + exchangeName);
}
}
void
BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
- const string& queueName,
- const string& exchangeName,
- const string& routingKey,
- const qpid::framing::FieldTable& arguments )
+ const string& queueName,
+ const string& exchangeName,
+ const string& routingKey,
+ const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = getQueue(queueName);
- if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+ Queue::shared_ptr queue = session.getQueue(queueName);
+ if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
- if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+ if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
@@ -282,17 +260,16 @@
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- getQueue(queue)->purge();
+ session.getQueue(queue)->purge();
}
-void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty){
+void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = getQueue(queue);
+ Queue::shared_ptr q = session.getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
- throw ChannelException(406, "Queue not empty.");
+ throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
- throw ChannelException(406, "Queue in use.");
+ throw PreconditionFailedException("Queue in use.");
}else{
//remove the queue from the list of exclusive queues if necessary
if(q->isExclusiveOwner(&connection)){
@@ -310,18 +287,18 @@
void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
- channel.setPrefetchSize(prefetchSize);
- channel.setPrefetchCount(prefetchCount);
+ session.setPrefetchSize(prefetchSize);
+ session.setPrefetchCount(prefetchCount);
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
- const string& queueName, const string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait, const FieldTable& fields)
+ const string& queueName, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = getQueue(queueName);
- if(!consumerTag.empty() && channel.exists(consumerTag)){
+ Queue::shared_ptr queue = session.getQueue(queueName);
+ if(!consumerTag.empty() && session.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
string newTag = consumerTag;
@@ -329,7 +306,7 @@
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
+ session.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
if(!nowait) client.consumeOk(newTag);
@@ -338,13 +315,13 @@
}
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- channel.cancel(consumerTag);
+ session.cancel(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = getQueue(queueName);
+ Queue::shared_ptr queue = session.getQueue(queueName);
DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!channel.get(token, queue, !noAck)){
+ if(!session.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
client.getEmpty(clusterId);
@@ -353,9 +330,9 @@
void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
if (multiple) {
- channel.ackCumulative(deliveryTag);
+ session.ackCumulative(deliveryTag);
} else {
- channel.ackRange(deliveryTag, deliveryTag);
+ session.ackRange(deliveryTag, deliveryTag);
}
}
@@ -363,29 +340,24 @@
void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
{
- channel.recover(requeue);
+ session.recover(requeue);
}
void BrokerAdapter::TxHandlerImpl::select()
{
- channel.startTx();
+ session.startTx();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
- channel.commit(&broker.getStore());
+ session.commit(&broker.getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
- channel.rollback();
- channel.recover(false);
+ session.rollback();
+ session.recover(false);
}
-void BrokerAdapter::ChannelHandlerImpl::ok()
-{
- //no specific action required, generic response handling should be sufficient
-}
-
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Fri Aug 31 13:51:22 2007
@@ -18,11 +18,13 @@
* limitations under the License.
*
*/
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "DtxHandlerImpl.h"
#include "HandlerImpl.h"
#include "MessageHandlerImpl.h"
-#include "DtxHandlerImpl.h"
+#include "NameGenerator.h"
#include "qpid/Exception.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace broker {
@@ -48,18 +50,17 @@
* Per-channel protocol adapter.
*
* A container for a collection of AMQP-class adapters that translate
- * AMQP method bodies into calls on the core Channel, Connection and
- * Broker objects. Each adapter class also provides a client proxy
- * to send methods to the peer.
+ * AMQP method bodies into calls on the core Broker objects. Each
+ * adapter class also provides a client proxy to send methods to the
+ * peer.
*
*/
class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a);
+ BrokerAdapter(Session& session, framing::ChannelAdapter& a);
framing::ProtocolVersion getVersion() const;
- ChannelHandler* getChannelHandler() { return &channelHandler; }
BasicHandler* getBasicHandler() { return &basicHandler; }
ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
BindingHandler* getBindingHandler() { return &bindingHandler; }
@@ -67,46 +68,27 @@
TxHandler* getTxHandler() { return &txHandler; }
MessageHandler* getMessageHandler() { return &messageHandler; }
AccessHandler* getAccessHandler() {
- throw ConnectionException(540, "Access class not implemented"); }
+ throw framing::NotImplementedException("Access class not implemented"); }
FileHandler* getFileHandler() {
- throw ConnectionException(540, "File class not implemented"); }
+ throw framing::NotImplementedException("File class not implemented"); }
StreamHandler* getStreamHandler() {
- throw ConnectionException(540, "Stream class not implemented"); }
+ throw framing::NotImplementedException("Stream class not implemented"); }
TunnelHandler* getTunnelHandler() {
- throw ConnectionException(540, "Tunnel class not implemented"); }
- SessionHandler* getSessionHandler() { throw ConnectionException(503, "Session class not implemented yet"); }
-
+ throw framing::NotImplementedException("Tunnel class not implemented"); }
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); }
- ConnectionHandler* getConnectionHandler() {
- throw ConnectionException(503, "Can't access connection class on non-zero channel!");
- }
+ // Handlers no longer implemented in BrokerAdapter:
+#define BADHANDLER() assert(0); throw framing::InternalErrorException()
+ ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
+ SessionHandler* getSessionHandler() { BADHANDLER(); }
+ ChannelHandler* getChannelHandler() { BADHANDLER(); }
+#undef BADHANDLER
framing::AMQP_ClientProxy& getProxy() { return proxy; }
private:
-
- class ChannelHandlerImpl :
- public ChannelHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Channel>
- {
- public:
- ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-
- void open(const std::string& outOfBand);
- void flow(bool active);
- void flowOk(bool active);
- void ok( );
- void ping( );
- void pong( );
- void resume( const std::string& channelId );
- void close(uint16_t replyCode, const
- std::string& replyText, uint16_t classId, uint16_t methodId);
- void closeOk();
- };
-
class ExchangeHandlerImpl :
public ExchangeHandler,
public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
@@ -201,9 +183,7 @@
void rollback();
};
- Connection& connection;
BasicHandlerImpl basicHandler;
- ChannelHandlerImpl channelHandler;
ExchangeHandlerImpl exchangeHandler;
BindingHandlerImpl bindingHandler;
MessageHandlerImpl messageHandler;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Aug 31 13:51:22 2007
@@ -23,7 +23,7 @@
#include <assert.h>
#include "Connection.h"
-#include "BrokerChannel.h"
+#include "Session.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "BrokerAdapter.h"
#include "SemanticHandler.h"
@@ -52,12 +52,8 @@
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
- // FIXME aconway 2007-08-29: review shutdown, not more shared_ptr.
- // OLD COMMENT:
- // Assign handler to new shared_ptr, as it may be erased
- // from the map by handle() if frame is a ChannelClose.
- //
- getChannel((frame.getChannel())).in(frame);
+ SessionAdapter sa = getChannel(frame.getChannel());
+ sa.in(frame);
}
}
@@ -98,18 +94,12 @@
if (i != channels.end()) channels.erase(i);
}
-
-FrameHandler::Chains& Connection::getChannel(ChannelId id) {
- // FIXME aconway 2007-08-29: Assuming session on construction,
- // move this to SessionAdapter::open.
+SessionAdapter Connection::getChannel(ChannelId id) {
boost::optional<SessionAdapter>& ch = channels[id];
if (!ch) {
- ch = boost::in_place(boost::ref(*this), id); // FIXME aconway 2007-08-29:
- assert(ch->getSession());
- broker.update(id, *ch->getSession());
+ ch = boost::in_place(boost::ref(*this), id);
}
- assert(ch->getSession());
- return *ch->getSession();
+ return *ch;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Aug 31 13:51:22 2007
@@ -36,7 +36,7 @@
#include "qpid/framing/ProtocolVersion.h"
#include "Broker.h"
#include "qpid/Exception.h"
-#include "BrokerChannel.h"
+#include "Session.h"
#include "ConnectionAdapter.h"
#include "SessionAdapter.h"
@@ -45,19 +45,14 @@
namespace qpid {
namespace broker {
-class Channel;
-
class Connection : public sys::ConnectionInputHandler,
public ConnectionToken
{
public:
Connection(sys::ConnectionOutputHandler* out, Broker& broker);
- /** Get a channel. Create if it does not already exist */
- framing::FrameHandler::Chains& getChannel(framing::ChannelId channel);
-
- /** Close a channel */
- void closeChannel(framing::ChannelId channel);
+ /** Get the SessionAdapter for channel. Create if it does not already exist */
+ SessionAdapter getChannel(framing::ChannelId channel);
/** Close the connection */
void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
@@ -84,7 +79,11 @@
void idleIn();
void closed();
+ // FIXME aconway 2007-08-30: When does closeChannel close the session?
+ void closeChannel(framing::ChannelId channel);
+
private:
+
// Use boost::optional to allow default-constructed uninitialized entries in the map.
typedef std::map<framing::ChannelId, boost::optional<SessionAdapter> >ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -97,7 +96,6 @@
framing::AMQP_ClientProxy::Connection* client;
uint64_t stagingThreshold;
ConnectionAdapter adapter;
-
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Aug 31 13:51:22 2007
@@ -19,7 +19,7 @@
*
*/
#include "DeliveryRecord.h"
-#include "BrokerChannel.h"
+#include "Session.h"
using namespace qpid::broker;
using std::string;
@@ -64,12 +64,12 @@
return range->covers(deliveryTag);
}
-void DeliveryRecord::redeliver(Channel* const channel) const{
+void DeliveryRecord::redeliver(Session* const session) const{
if(pull){
//if message was originally sent as response to get, we must requeue it
requeue();
}else{
- channel->deliver(msg.payload, consumerTag, deliveryTag);
+ session->deliver(msg.payload, consumerTag, deliveryTag);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Aug 31 13:51:22 2007
@@ -32,44 +32,44 @@
#include "Prefetch.h"
namespace qpid {
- namespace broker {
- class Channel;
+namespace broker {
+class Session;
- /**
- * Record of a delivery for which an ack is outstanding.
- */
- class DeliveryRecord{
- mutable QueuedMessage msg;
- mutable Queue::shared_ptr queue;
- const std::string consumerTag;
- const DeliveryId deliveryTag;
- bool acquired;
- const bool pull;
+/**
+ * Record of a delivery for which an ack is outstanding.
+ */
+class DeliveryRecord{
+ mutable QueuedMessage msg;
+ mutable Queue::shared_ptr queue;
+ const std::string consumerTag;
+ const DeliveryId deliveryTag;
+ bool acquired;
+ const bool pull;
- public:
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
+ public:
+ DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
+ DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
- void dequeue(TransactionContext* ctxt = 0) const;
- bool matches(DeliveryId tag) const;
- bool matchOrAfter(DeliveryId tag) const;
- bool after(DeliveryId tag) const;
- bool coveredBy(const AccumulatedAck* const range) const;
- void requeue() const;
- void redeliver(Channel* const) const;
- void updateByteCredit(uint32_t& credit) const;
- void addTo(Prefetch&) const;
- void subtractFrom(Prefetch&) const;
- const std::string& getConsumerTag() const { return consumerTag; }
- bool isPull() const { return pull; }
- bool isAcquired() const { return acquired; }
- void setAcquired(bool isAcquired) { acquired = isAcquired; }
+ void dequeue(TransactionContext* ctxt = 0) const;
+ bool matches(DeliveryId tag) const;
+ bool matchOrAfter(DeliveryId tag) const;
+ bool after(DeliveryId tag) const;
+ bool coveredBy(const AccumulatedAck* const range) const;
+ void requeue() const;
+ void redeliver(Session* const) const;
+ void updateByteCredit(uint32_t& credit) const;
+ void addTo(Prefetch&) const;
+ void subtractFrom(Prefetch&) const;
+ const std::string& getConsumerTag() const { return consumerTag; }
+ bool isPull() const { return pull; }
+ bool isAcquired() const { return acquired; }
+ void setAcquired(bool isAcquired) { acquired = isAcquired; }
- friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
- };
+ friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
+};
- typedef std::list<DeliveryRecord>::iterator ack_iterator;
- }
+typedef std::list<DeliveryRecord>::iterator ack_iterator;
+}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Aug 31 13:51:22 2007
@@ -19,7 +19,7 @@
#include <boost/format.hpp>
#include "Broker.h"
-#include "BrokerChannel.h"
+#include "Session.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -41,7 +41,7 @@
void DtxHandlerImpl::select()
{
- channel.selectDtx();
+ session.selectDtx();
}
DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -51,7 +51,7 @@
{
try {
if (fail) {
- channel.endDtx(xid, true);
+ session.endDtx(xid, true);
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
@@ -59,9 +59,9 @@
}
} else {
if (suspend) {
- channel.suspendDtx(xid);
+ session.suspendDtx(xid);
} else {
- channel.endDtx(xid, false);
+ session.endDtx(xid, false);
}
return DtxDemarcationEndResult(XA_OK);
}
@@ -80,9 +80,9 @@
}
try {
if (resume) {
- channel.resumeDtx(xid);
+ session.resumeDtx(xid);
} else {
- channel.startDtx(xid, broker.getDtxManager(), join);
+ session.startDtx(xid, broker.getDtxManager(), join);
}
return DtxDemarcationStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Fri Aug 31 13:51:22 2007
@@ -20,19 +20,13 @@
*/
#include "Broker.h"
-#include "BrokerChannel.h"
#include "qpid/framing/AMQP_ClientProxy.h"
namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
namespace broker {
- //class Channel;
class Connection;
+class Session;
/**
* A collection of references to the core objects required by an adapter,
@@ -40,36 +34,14 @@
*/
struct CoreRefs
{
- CoreRefs(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
- : channel(ch), connection(c), broker(b), adapter(a), proxy(a) {}
+ CoreRefs(Session& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
+ : session(ch), connection(c), broker(b), adapter(a), proxy(a) {}
- Channel& channel;
+ Session& session;
Connection& connection;
Broker& broker;
framing::ChannelAdapter& adapter;
framing::AMQP_ClientProxy proxy;
-
- /**
- * Get named queue, never returns 0.
- * @return: named queue or default queue for channel if name=""
- * @exception: ChannelException if no queue of that name is found.
- * @exception: ConnectionException if name="" and channel has no default.
- */
- Queue::shared_ptr getQueue(const string& name) {
- //Note: this can be removed soon as the default queue for channels is scrapped in 0-10
- Queue::shared_ptr queue;
- if (name.empty()) {
- queue = channel.getDefaultQueue();
- if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
- } else {
- queue = broker.getQueues().find(name);
- if (queue == 0) {
- throw ChannelException( 404, "Queue not found: " + name);
- }
- }
- return queue;
- }
-
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Fri Aug 31 13:51:22 2007
@@ -18,7 +18,7 @@
#include "qpid/QpidError.h"
#include "MessageHandlerImpl.h"
-#include "BrokerChannel.h"
+#include "Session.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
#include "Broker.h"
@@ -45,7 +45,7 @@
void
MessageHandlerImpl::cancel(const string& destination )
{
- channel.cancel(destination);
+ session.cancel(destination);
}
void
@@ -96,12 +96,12 @@
bool exclusive,
const framing::FieldTable& filter )
{
- Queue::shared_ptr queue = getQueue(queueName);
- if(!destination.empty() && channel.exists(destination))
+ Queue::shared_ptr queue = session.getQueue(queueName);
+ if(!destination.empty() && session.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
@@ -114,9 +114,9 @@
const string& destination,
bool noAck )
{
- Queue::shared_ptr queue = getQueue(queueName);
+ Queue::shared_ptr queue = session.getQueue(queueName);
- if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+ if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -145,14 +145,14 @@
bool /*global*/ )
{
//TODO: handle global
- channel.setPrefetchSize(prefetchSize);
- channel.setPrefetchCount(prefetchCount);
+ session.setPrefetchSize(prefetchSize);
+ session.setPrefetchCount(prefetchCount);
}
void
MessageHandlerImpl::recover(bool requeue)
{
- channel.recover(requeue);
+ session.recover(requeue);
}
void
@@ -166,10 +166,10 @@
if (unit == 0) {
//message
- channel.addMessageCredit(destination, value);
+ session.addMessageCredit(destination, value);
} else if (unit == 1) {
//bytes
- channel.addByteCredit(destination, value);
+ session.addByteCredit(destination, value);
} else {
//unknown
throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -181,10 +181,10 @@
{
if (mode == 0) {
//credit
- channel.setCreditMode(destination);
+ session.setCreditMode(destination);
} else if (mode == 1) {
//window
- channel.setWindowMode(destination);
+ session.setWindowMode(destination);
} else{
throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
}
@@ -192,12 +192,12 @@
void MessageHandlerImpl::flush(const std::string& destination)
{
- channel.flush(destination);
+ session.flush(destination);
}
void MessageHandlerImpl::stop(const std::string& destination)
{
- channel.stop(destination);
+ session.stop(destination);
}
void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Aug 31 13:51:22 2007
@@ -20,25 +20,29 @@
*/
#include "SemanticHandler.h"
-
-#include "boost/format.hpp"
+#include "Session.h"
+#include "SessionAdapter.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/ChannelCloseOkBody.h"
+#include "Connection.h"
+#include "Session.h"
#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ExecutionResultBody.h"
+#include "qpid/framing/ChannelOpenBody.h"
#include "qpid/framing/InvocationVisitor.h"
+#include <boost/format.hpp>
+
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
- connection(c), channel(c, *this, id)
+SemanticHandler::SemanticHandler(Session& s) :
+ session(s),
+ connection(s.getAdapter()->getConnection()),
+ adapter(s, static_cast<ChannelAdapter&>(*this))
{
- init(id, connection.getOutput(), connection.getVersion());
- adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
+ init(s.getAdapter()->getChannel(), s.out, 0);
}
void SemanticHandler::handle(framing::AMQFrame& frame)
@@ -60,35 +64,18 @@
//open. execute it (i.e. out-of order execution with respect to
//the command id sequence) or queue it up?
- try{
-
- TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
-
- switch(track) {
- case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler
- handleL2(frame.castBody<AMQMethodBody>());
- break;
- case EXECUTION_CONTROL_TRACK:
- handleL3(frame.castBody<AMQMethodBody>());
- break;
- case MODEL_COMMAND_TRACK:
- if (!isOpen()) {
- throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
- }
- handleCommand(frame.castBody<AMQMethodBody>());
- break;
- case MODEL_CONTENT_TRACK:
- handleContent(frame);
- break;
- }
+ TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
- }catch(const ChannelException& e){
- adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
- connection.closeChannel(getId());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
- }catch(const std::exception& e){
- connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame));
+ switch(track) {
+ case EXECUTION_CONTROL_TRACK:
+ handleL3(frame.getMethod());
+ break;
+ case MODEL_COMMAND_TRACK:
+ handleCommand(frame.getMethod());
+ break;
+ case MODEL_CONTENT_TRACK:
+ handleContent(frame);
+ break;
}
}
@@ -99,13 +86,13 @@
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- channel.ackCumulative(mark.getValue());
+ session.ackCumulative(mark.getValue());
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
- channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
@@ -141,7 +128,7 @@
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
++(incoming.lwm);
- InvocationVisitor v(adapter.get());
+ InvocationVisitor v(&adapter);
method->accept(v);
//TODO: need to account for async store operations and interleaving
++(incoming.hwm);
@@ -153,17 +140,6 @@
}
}
-void SemanticHandler::handleL2(framing::AMQMethodBody* method)
-{
- if(!method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
- }
- } else {
- method->invoke(adapter->getChannelHandler());
- }
-}
-
void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
if (!method->invoke(this)) {
@@ -181,16 +157,16 @@
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
msg->setPublisher(&connection);
- channel.handle(msg);
+ session.handle(msg);
msgBuilder.end();
//TODO: need to account for async store operations and interleaving
++(incoming.hwm);
}
}
-bool SemanticHandler::isOpen() const
-{
- return channel.isOpen();
+bool SemanticHandler::isOpen() const {
+ // FIXME aconway 2007-08-30: remove.
+ return true;
}
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
@@ -210,45 +186,39 @@
void SemanticHandler::send(const AMQBody& body)
{
Mutex::ScopedLock l(outLock);
- if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- //temporary hack until channel management is moved to its own handler:
+ // FIXME aconway 2007-08-31: SessionAdapter should not send
+ // channel/session commands via the semantic handler, it should shortcut
+ // directly to its own output handler. That will make the CLASS_ID
+ // part of the test unnecessary.
+ //
+ if (body.getMethod() &&
+ body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID)
+ {
++outgoing.hwm;
}
ChannelAdapter::send(body);
}
-uint16_t SemanticHandler::getClassId(const AMQFrame& frame)
-{
- return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0;
-}
-
-uint16_t SemanticHandler::getMethodId(const AMQFrame& frame)
-{
- return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0;
-}
-
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
{
//will be replaced by field in 0-10 frame header
uint8_t type = frame.getBody()->type();
uint16_t classId;
switch(type) {
- case METHOD_BODY:
+ case METHOD_BODY:
if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
return MODEL_CONTENT_TRACK;
}
classId = frame.castBody<AMQMethodBody>()->amqpClassId();
switch (classId) {
- case ChannelOpenBody::CLASS_ID:
- return SESSION_CONTROL_TRACK;
- case ExecutionCompleteBody::CLASS_ID:
+ case ExecutionCompleteBody::CLASS_ID:
return EXECUTION_CONTROL_TRACK;
}
return MODEL_COMMAND_TRACK;
- case HEADER_BODY:
- case CONTENT_BODY:
+ case HEADER_BODY:
+ case CONTENT_BODY:
return MODEL_CONTENT_TRACK;
}
throw Exception("Could not determine track");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Fri Aug 31 13:51:22 2007
@@ -22,14 +22,15 @@
#define _SemanticHandler_
#include <memory>
-#include "BrokerChannel.h"
-#include "Connection.h"
+#include "BrokerAdapter.h"
#include "DeliveryAdapter.h"
#include "MessageBuilder.h"
+
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/ChannelAdapter.h"
namespace qpid {
@@ -42,34 +43,31 @@
namespace broker {
-class BrokerAdapter;
-class framing::ChannelAdapter;
+class Session;
-class SemanticHandler : private framing::ChannelAdapter,
- private DeliveryAdapter,
- public framing::FrameHandler,
- public framing::AMQP_ServerOperations::ExecutionHandler
+class SemanticHandler : public DeliveryAdapter,
+ public framing::FrameHandler,
+ public framing::AMQP_ServerOperations::ExecutionHandler,
+ private framing::ChannelAdapter
{
+ Session& session;
Connection& connection;
- Channel channel;
- std::auto_ptr<BrokerAdapter> adapter;
+ BrokerAdapter adapter;
framing::Window incoming;
framing::Window outgoing;
sys::Mutex outLock;
MessageBuilder msgBuilder;
- enum TrackId {SESSION_CONTROL_TRACK, EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
+ enum TrackId {EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
TrackId getTrack(const framing::AMQFrame& frame);
- uint16_t getClassId(const framing::AMQFrame& frame);
- uint16_t getMethodId(const framing::AMQFrame& frame);
void handleL3(framing::AMQMethodBody* method);
- void handleL2(framing::AMQMethodBody* method);
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
//ChannelAdapter virtual methods:
void handleMethod(framing::AMQMethodBody* method);
+
bool isOpen() const;
void handleHeader(framing::AMQHeaderBody*);
void handleContent(framing::AMQContentBody*);
@@ -83,10 +81,13 @@
void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
public:
- SemanticHandler(framing::ChannelId id, Connection& c);
+ SemanticHandler(Session& session);
//frame handler:
void handle(framing::AMQFrame& frame);
+
+ // FIXME aconway 2007-08-31: Move proxy to Session.
+ framing::AMQP_ClientProxy& getProxy() { return adapter.getProxy(); }
//execution class method handlers:
void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Fri Aug 31 13:51:22 2007
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,22 +20,543 @@
*/
#include "Session.h"
+
+#include "BrokerAdapter.h"
+#include "BrokerQueue.h"
+#include "Connection.h"
+#include "DeliverableMessage.h"
+#include "DtxAck.h"
+#include "DtxTimeout.h"
+#include "Message.h"
#include "SemanticHandler.h"
#include "SessionAdapter.h"
+#include "TxAck.h"
+#include "TxPublish.h"
+#include "qpid/QpidError.h"
+#include "qpid/framing/reply_exceptions.h"
+
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+
+#include <assert.h>
+
namespace qpid {
namespace broker {
+using std::mem_fun_ref;
+using std::bind2nd;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
Session::Session(SessionAdapter& a, uint32_t t)
- : adapter(&a), timeout(t)
+ : adapter(&a),
+ broker(adapter->getConnection().broker),
+ timeout(t),
+ prefetchSize(0),
+ prefetchCount(0),
+ tagGenerator("sgen"),
+ dtxSelected(false),
+ accumulatedAck(0),
+ flowActive(true)
{
- assert(adapter);
+ outstanding.reset();
// FIXME aconway 2007-08-29: handler to get Session, not connection.
- handlers.push_back(new SemanticHandler(adapter->getChannel(), adapter->getConnection()));
+ std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
+ deliveryAdapter=semantic.get();
+ // FIXME aconway 2007-08-31: Remove, workaround.
+ semanticHandler=semantic.get();
+ handlers.push_back(semantic.release());
in = &handlers[0];
- out = &adapter->getConnection().getOutput();
+ out = &adapter->out;
+ // FIXME aconway 2007-08-31: handlerupdater->sessionupdater,
+ // create a SessionManager in the broker for all session related
+ // stuff: suspended sessions, handler updaters etc.
+ // FIXME aconway 2007-08-31: Shouldn't be passing channel ID
+ broker.update(a.getChannel(), *this);
+}
+
+Session::~Session() {
+ close();
+}
+
+bool Session::exists(const string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
+ Queue::shared_ptr queue, bool nolocal, bool acks,
+ bool exclusive, const FieldTable*)
+{
+ if(tagInOut.empty())
+ tagInOut = tagGenerator.generate();
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
+ queue->consume(c.get(), exclusive);//may throw exception
+ consumers.insert(tagInOut, c.release());
+}
+
+void Session::cancel(const string& tag){
+ // consumers is a ptr_map so erase will delete the consumer
+ // which will call cancel.
+ ConsumerImplMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ consumers.erase(i);
+}
+
+void Session::close()
+{
+ opened = false;
+ consumers.clear();
+ if (dtxBuffer.get()) {
+ dtxBuffer->fail();
+ }
+ recover(true);
+}
+
+void Session::startTx()
+{
+ txBuffer = TxBuffer::shared_ptr(new TxBuffer());
+}
+
+void Session::commit(MessageStore* const store)
+{
+ if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+
+ TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+ txBuffer->enlist(txAck);
+ if (txBuffer->commitLocal(store)) {
+ accumulatedAck.clear();
+ }
+}
+
+void Session::rollback()
+{
+ if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+
+ txBuffer->rollback();
+ accumulatedAck.clear();
+}
+
+void Session::selectDtx()
+{
+ dtxSelected = true;
+}
+
+void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+{
+ if (!dtxSelected) {
+ throw ConnectionException(503, "Session has not been selected for use with dtx");
+ }
+ dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
+ txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
+ if (join) {
+ mgr.join(xid, dtxBuffer);
+ } else {
+ mgr.start(xid, dtxBuffer);
+ }
+}
+
+void Session::endDtx(const std::string& xid, bool fail)
+{
+ if (!dtxBuffer) {
+ throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
+ }
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
+ % dtxBuffer->getXid() % xid);
+ }
+
+ txBuffer.reset();//ops on this session no longer transactional
+
+ checkDtxTimeout();
+ if (fail) {
+ dtxBuffer->fail();
+ } else {
+ dtxBuffer->markEnded();
+ }
+ dtxBuffer.reset();
+}
+
+void Session::suspendDtx(const std::string& xid)
+{
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
+ % dtxBuffer->getXid() % xid);
+ }
+ txBuffer.reset();//ops on this session no longer transactional
+
+ checkDtxTimeout();
+ dtxBuffer->setSuspended(true);
+}
+
+void Session::resumeDtx(const std::string& xid)
+{
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
+ % dtxBuffer->getXid() % xid);
+ }
+ if (!dtxBuffer->isSuspended()) {
+ throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+ }
+
+ checkDtxTimeout();
+ dtxBuffer->setSuspended(false);
+ txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
}
+void Session::checkDtxTimeout()
+{
+ if (dtxBuffer->isExpired()) {
+ dtxBuffer.reset();
+ throw DtxTimeoutException();
+ }
+}
+
+void Session::record(const DeliveryRecord& delivery)
+{
+ unacked.push_back(delivery);
+ delivery.addTo(outstanding);
+}
+
+bool Session::checkPrefetch(Message::shared_ptr& msg)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
+ return countOk && sizeOk;
+}
+
+Session::ConsumerImpl::ConsumerImpl(Session* _parent,
+ DeliveryToken::shared_ptr _token,
+ const string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _nolocal,
+ bool _acquire
+) : parent(_parent),
+ token(_token),
+ name(_name),
+ queue(_queue),
+ ackExpected(ack),
+ nolocal(_nolocal),
+ acquire(_acquire),
+ blocked(false),
+ windowing(true),
+ msgCredit(0xFFFFFFFF),
+ byteCredit(0xFFFFFFFF) {}
+
+bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
+{
+ if (nolocal && &parent->getAdapter()->getConnection() == msg.payload->getPublisher()) {
+ return false;
+ } else {
+ if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
+ blocked = true;
+ } else {
+ blocked = false;
+
+ Mutex::ScopedLock locker(parent->deliveryLock);
+
+ DeliveryId deliveryTag =
+ parent->deliveryAdapter->deliver(msg.payload, token);
+ if (ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
+ }
+ }
+ return !blocked;
+ }
+}
+
+bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+{
+ Mutex::ScopedLock l(lock);
+ if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
+ return false;
+ } else {
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit--;
+ }
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit -= msg->getRequiredCredit();
+ }
+ return true;
+ }
+}
+
+void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
+ Mutex::ScopedLock locker(parent->deliveryLock);
+ parent->deliveryAdapter->redeliver(msg, token, deliveryTag);
+}
+
+Session::ConsumerImpl::~ConsumerImpl() {
+ cancel();
+}
+
+void Session::ConsumerImpl::cancel()
+{
+ if(queue) {
+ queue->cancel(this);
+ if (queue->canAutoDelete()) {
+ parent->getAdapter()->getConnection().broker.getQueues().destroyIf(queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+ }
+ }
+}
+
+void Session::ConsumerImpl::requestDispatch()
+{
+ if(blocked)
+ queue->requestDispatch();
+}
+
+void Session::handle(Message::shared_ptr msg) {
+ if (txBuffer.get()) {
+ TxPublish* deliverable(new TxPublish(msg));
+ TxOp::shared_ptr op(deliverable);
+ route(msg, *deliverable);
+ txBuffer->enlist(op);
+ } else {
+ DeliverableMessage deliverable(msg);
+ route(msg, deliverable);
+ }
+}
+void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
+ std::string exchangeName = msg->getExchangeName();
+ if (!cacheExchange || cacheExchange->getName() != exchangeName){
+ cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
+ }
+
+ cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+
+ if (!strategy.delivered) {
+ //TODO:if reject-unroutable, then reject
+ //else route to alternate exchange
+ if (cacheExchange->getAlternate()) {
+ cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ }
+ }
+
+}
+
+void Session::ackCumulative(DeliveryId id)
+{
+ ack(id, id, true);
+}
+
+void Session::ackRange(DeliveryId first, DeliveryId last)
+{
+ ack(first, last, false);
+}
+
+void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
+{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator start = cumulative ? unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
+
+ if (cumulative || first != last) {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ } else {
+ //just acked single element (move end past it)
+ ++end;
+ }
+
+ for_each(start, end, boost::bind(&Session::acknowledged, this, _1));
+
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(start, end);
+ }
+
+ //if the prefetch limit had previously been reached, or credit
+ //had expired in windowing mode there may be messages that can
+ //be now be delivered
+ for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+}
+
+void Session::acknowledged(const DeliveryRecord& delivery)
+{
+ delivery.subtractFrom(outstanding);
+ ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+ if (i != consumers.end()) {
+ i->acknowledged(delivery);
+ }
+}
+
+void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+{
+ if (windowing) {
+ if (msgCredit != 0xFFFFFFFF) msgCredit++;
+ if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
+ }
+}
+
+void Session::recover(bool requeue)
+{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ if(requeue){
+ outstanding.reset();
+ //take copy and clear unacked as requeue may result in redelivery to this session
+ //which will in turn result in additions to unacked
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
+ }else{
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+ }
+}
+
+bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+{
+ QueuedMessage msg = queue->dequeue();
+ if(msg.payload){
+ Mutex::ScopedLock locker(deliveryLock);
+ DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token);
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Session::deliver(Message::shared_ptr& msg, const string& consumerTag,
+ DeliveryId deliveryTag)
+{
+ ConsumerImplMap::iterator i = consumers.find(consumerTag);
+ if (i != consumers.end()){
+ i->redeliver(msg, deliveryTag);
+ }
+}
+
+void Session::flow(bool active)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ bool requestDelivery(!flowActive && active);
+ flowActive = active;
+ if (requestDelivery) {
+ //there may be messages that can be now be delivered
+ std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+ }
+}
+
+
+Session::ConsumerImpl& Session::find(const std::string& destination)
+{
+ ConsumerImplMap::iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+ } else {
+ return *i;
+ }
+}
+
+void Session::setWindowMode(const std::string& destination)
+{
+ find(destination).setWindowMode();
+}
+
+void Session::setCreditMode(const std::string& destination)
+{
+ find(destination).setCreditMode();
+}
+
+void Session::addByteCredit(const std::string& destination, uint32_t value)
+{
+ find(destination).addByteCredit(value);
+}
+
+
+void Session::addMessageCredit(const std::string& destination, uint32_t value)
+{
+ find(destination).addMessageCredit(value);
+}
+
+void Session::flush(const std::string& destination)
+{
+ ConsumerImpl& c = find(destination);
+ c.flush();
+}
+
+
+void Session::stop(const std::string& destination)
+{
+ find(destination).stop();
+}
+
+void Session::ConsumerImpl::setWindowMode()
+{
+ windowing = true;
+}
+
+void Session::ConsumerImpl::setCreditMode()
+{
+ windowing = false;
+}
+
+void Session::ConsumerImpl::addByteCredit(uint32_t value)
+{
+ byteCredit += value;
+ requestDispatch();
+}
+
+void Session::ConsumerImpl::addMessageCredit(uint32_t value)
+{
+ msgCredit += value;
+ requestDispatch();
+}
+
+void Session::ConsumerImpl::flush()
+{
+ //TODO: need to wait until any messages that are available for
+ //this consumer have been delivered... i.e. some sort of flush on
+ //the queue...
+}
+
+void Session::ConsumerImpl::stop()
+{
+ msgCredit = 0;
+ byteCredit = 0;
+}
+
+Queue::shared_ptr Session::getQueue(const string& name) const {
+ //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = getDefaultQueue();
+ if (!queue)
+ throw NotAllowedException(QPID_MSG("No queue name specified."));
+ }
+ else {
+ queue = getBroker().getQueues().find(name);
+ if (!queue)
+ throw NotFoundException(QPID_MSG("Queue not found: "<<name));
+ }
+ return queue;
+}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Fri Aug 31 13:51:22 2007
@@ -2,6 +2,7 @@
#define QPID_BROKER_SESSION_H
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,14 +22,35 @@
*
*/
+#include "AccumulatedAck.h"
+#include "Consumer.h"
+#include "Deliverable.h"
+#include "DeliveryAdapter.h"
+#include "DeliveryRecord.h"
+#include "DeliveryToken.h"
+#include "DtxBuffer.h"
+#include "DtxManager.h"
+#include "NameGenerator.h"
+#include "Prefetch.h"
+#include "TxBuffer.h"
+#include "SemanticHandler.h" // FIXME aconway 2007-08-31: remove
#include "qpid/framing/FrameHandler.h"
+#include "qpid/shared_ptr.h"
#include <boost/ptr_container/ptr_vector.hpp>
+#include <list>
+
namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
namespace broker {
class SessionAdapter;
+class Broker;
/**
* Session holds the state of an open session, whether attached to a
@@ -38,19 +60,145 @@
class Session : public framing::FrameHandler::Chains,
private boost::noncopyable
{
+ class ConsumerImpl : public Consumer
+ {
+ sys::Mutex lock;
+ Session* const parent;
+ const DeliveryToken::shared_ptr token;
+ const string name;
+ const Queue::shared_ptr queue;
+ const bool ackExpected;
+ const bool nolocal;
+ const bool acquire;
+ bool blocked;
+ bool windowing;
+ uint32_t msgCredit;
+ uint32_t byteCredit;
+
+ bool checkCredit(Message::shared_ptr& msg);
+
+ public:
+ ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token,
+ const string& name, Queue::shared_ptr queue,
+ bool ack, bool nolocal, bool acquire=true);
+ ~ConsumerImpl();
+ bool deliver(QueuedMessage& msg);
+ void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
+ void cancel();
+ void requestDispatch();
+
+ void setWindowMode();
+ void setCreditMode();
+ void addByteCredit(uint32_t value);
+ void addMessageCredit(uint32_t value);
+ void flush();
+ void stop();
+ void acknowledged(const DeliveryRecord&);
+ };
+
+ typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+
+ SessionAdapter* adapter;
+ Broker& broker;
+ uint32_t timeout;
+ boost::ptr_vector<framing::FrameHandler> handlers;
+
+ DeliveryAdapter* deliveryAdapter;
+ Queue::shared_ptr defaultQueue;
+ ConsumerImplMap consumers;
+ uint32_t prefetchSize;
+ uint16_t prefetchCount;
+ Prefetch outstanding;
+ NameGenerator tagGenerator;
+ std::list<DeliveryRecord> unacked;
+ sys::Mutex deliveryLock;
+ TxBuffer::shared_ptr txBuffer;
+ DtxBuffer::shared_ptr dtxBuffer;
+ bool dtxSelected;
+ AccumulatedAck accumulatedAck;
+ bool opened;
+ bool flowActive;
+
+ boost::shared_ptr<Exchange> cacheExchange;
+
+ void route(Message::shared_ptr msg, Deliverable& strategy);
+ void record(const DeliveryRecord& delivery);
+ bool checkPrefetch(Message::shared_ptr& msg);
+ void checkDtxTimeout();
+ ConsumerImpl& find(const std::string& destination);
+ void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
+ void acknowledged(const DeliveryRecord&);
+
+ // FIXME aconway 2007-08-31: remove, temporary hack.
+ SemanticHandler* semanticHandler;
+
+
public:
Session(SessionAdapter&, uint32_t timeout);
+ ~Session();
/** Returns 0 if this session is not currently attached */
SessionAdapter* getAdapter() { return adapter; }
const SessionAdapter* getAdapter() const { return adapter; }
+ Broker& getBroker() const { return broker; }
+
+ /** Session timeout. */
uint32_t getTimeout() const { return timeout; }
- private:
- SessionAdapter* adapter;
- uint32_t timeout;
- boost::ptr_vector<framing::FrameHandler> handlers;
+ /**
+ * Get named queue, never returns 0.
+ * @return: named queue or default queue for session if name=""
+ * @exception: ChannelException if no queue of that name is found.
+ * @exception: ConnectionException if name="" and session has no default.
+ */
+ Queue::shared_ptr getQueue(const std::string& name) const;
+
+
+ void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+ Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
+ uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
+ uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
+
+ bool exists(const string& consumerTag);
+
+ /**
+ *@param tagInOut - if empty it is updated with the generated token.
+ */
+ void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
+ bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0);
+
+ void cancel(const string& tag);
+
+ void setWindowMode(const std::string& destination);
+ void setCreditMode(const std::string& destination);
+ void addByteCredit(const std::string& destination, uint32_t value);
+ void addMessageCredit(const std::string& destination, uint32_t value);
+ void flush(const std::string& destination);
+ void stop(const std::string& destination);
+
+ bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
+ void close();
+ void startTx();
+ void commit(MessageStore* const store);
+ void rollback();
+ void selectDtx();
+ void startDtx(const std::string& xid, DtxManager& mgr, bool join);
+ void endDtx(const std::string& xid, bool fail);
+ void suspendDtx(const std::string& xid);
+ void resumeDtx(const std::string& xid);
+ void ackCumulative(DeliveryId deliveryTag);
+ void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
+ void recover(bool requeue);
+ void flow(bool active);
+ void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
+
+ void handle(Message::shared_ptr msg);
+
+ framing::AMQP_ClientProxy& getProxy() {
+ // FIXME aconway 2007-08-31: Move proxy to Session.
+ return semanticHandler->getProxy();
+ }
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Aug 31 13:51:22 2007
@@ -19,27 +19,128 @@
*/
#include "SessionAdapter.h"
+#include "Session.h"
+#include "Connection.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
using namespace framing;
+// FIXME aconway 2007-08-31: the SessionAdapter should create its
+// private proxy directly on the connections out handler.
+// Session/channel methods should not go thru the other layers.
+// Need to get rid of ChannelAdapter and allow proxies to be created
+// directly on output handlers.
+//
+framing::AMQP_ClientProxy& SessionAdapter::getProxy() {
+ return session->getProxy();
+}
+
SessionAdapter::SessionAdapter(Connection& c, ChannelId ch)
- : connection(c), channel(ch)
+ : connection(c), channel(ch), ignoring(false)
{
- // FIXME aconway 2007-08-29: When we handle session commands,
- // do this on open.
- session.reset(new Session(*this, 0));
+ in = this;
+ out = &c.getOutput();
}
SessionAdapter::~SessionAdapter() {}
+namespace {
+ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
+MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
+} // namespace
void SessionAdapter::handle(AMQFrame& f) {
- // FIXME aconway 2007-08-29: handle session commands here, forward
- // other frames.
- session->in(f);
+ // Note on channel states: a channel is open if session != 0. A
+ // channel that is closed (session == 0) can be in the "ignoring"
+ // state. This is a temporary state after we have sent a channel
+ // exception, where extra frames might arrive that should be
+ // ignored.
+ //
+ AMQMethodBody* m=f.getMethod();
+ try {
+ if (m && m->invoke(static_cast<Invocable*>(this)))
+ return;
+ else if (session)
+ session->in(f);
+ else if (!ignoring)
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << channel << " is not open"));
+ } catch(const ChannelException& e){
+ getProxy().getChannel().close(
+ e.code, e.toString(), classId(m), methodId(m));
+ session.reset();
+ ignoring=true; // Ignore trailing frames sent by client.
+ }catch(const ConnectionException& e){
+ connection.close(e.code, e.what(), classId(m), methodId(m));
+ }catch(const std::exception& e){
+ connection.close(
+ framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
+ }
+}
+
+void SessionAdapter::assertOpen(const char* method) {
+ if (!session)
+ throw ChannelErrorException(
+ QPID_MSG(""<<method<<" failed: No session for channel "
+ << getChannel()));
+}
+
+void SessionAdapter::assertClosed(const char* method) {
+ // FIXME aconway 2007-08-31: Should raise channel-busy, need
+ // to update spec.
+ if (session)
+ throw PreconditionFailedException(
+ QPID_MSG(""<<method<<" failed: "
+ << channel << " already open on channel "
+ << getChannel()));
+}
+
+void SessionAdapter::open(const string& /*outOfBand*/){
+ assertClosed("open");
+ session.reset(new Session(*this, 0));
+ getProxy().getChannel().openOk();
+}
+
+// FIXME aconway 2007-08-31: flow is no longer in the spec.
+void SessionAdapter::flow(bool active){
+ session->flow(active);
+ getProxy().getChannel().flowOk(active);
+}
+
+void SessionAdapter::flowOk(bool /*active*/){}
+
+void SessionAdapter::close(uint16_t replyCode,
+ const string& replyText,
+ uint16_t classId, uint16_t methodId)
+{
+ // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
+ // to text names.
+ QPID_LOG(warning, "Received session.close("<<replyCode<<","
+ <<replyText << ","
+ << "classid=" <<classId<< ","
+ << "methodid=" <<methodId);
+ ignoring=false;
+ getProxy().getChannel().closeOk();
+ // FIXME aconway 2007-08-31: sould reset session BEFORE
+ // sending closeOK to avoid races. SessionAdapter
+ // needs its own private proxy, see getProxy() above.
+ session.reset();
+ // No need to remove from connection map, will be re-used
+ // if channel is re-opened.
+}
+
+void SessionAdapter::closeOk(){
+ ignoring=false;
}
+void SessionAdapter::ok()
+{
+ //no specific action required, generic response handling should be
+ //sufficient
+}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Fri Aug 31 13:51:22 2007
@@ -23,10 +23,15 @@
*/
#include "qpid/framing/FrameHandler.h"
-#include "qpid/broker/Session.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/amqp_types.h"
namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
namespace broker {
class Connection;
@@ -39,7 +44,10 @@
*
* SessionAdapters can be stored in a map by value.
*/
-class SessionAdapter : public framing::FrameHandler
+class SessionAdapter :
+ public framing::FrameHandler::Chains,
+ private framing::FrameHandler,
+ private framing::AMQP_ServerOperations::ChannelHandler
{
public:
SessionAdapter(Connection&, framing::ChannelId);
@@ -56,11 +64,29 @@
framing::ChannelId getChannel() const { return channel; }
Connection& getConnection() { return connection; }
const Connection& getConnection() const { return connection; }
-
+
private:
+ void assertOpen(const char* method);
+ void assertClosed(const char* method);
+
+ framing::AMQP_ClientProxy& getProxy();
+
+ // FIXME aconway 2007-08-31: Replace channel commands with session.
+ void open(const std::string& outOfBand);
+ void flow(bool active);
+ void flowOk(bool active);
+ void ok( );
+ void ping( );
+ void pong( );
+ void resume( const std::string& channelId );
+ void close(uint16_t replyCode, const
+ std::string& replyText, uint16_t classId, uint16_t methodId);
+ void closeOk();
+
Connection& connection;
const framing::ChannelId channel;
shared_ptr<Session> session;
+ bool ignoring;
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp Fri Aug 31 13:51:22 2007
@@ -25,7 +25,6 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/broker/BrokerAdapter.h"
#include "qpid/broker/Connection.h"
-#include "qpid/broker/BrokerChannel.h"
#include "qpid/framing/ChannelAdapter.h"
#include <boost/utility/in_place_factory.hpp>
@@ -38,13 +37,13 @@
using namespace broker;
/** Handler to send frames direct to local broker (bypass correlation etc.) */
-struct SessionManager::BrokerHandler :
- public FrameHandler, private ChannelAdapter, private DeliveryAdapter
+struct SessionManager::BrokerHandler : public FrameHandler, private ChannelAdapter
{
Connection connection;
- Channel channel;
+ SessionAdapter sessionAdapter;
+ broker::Session session;
BrokerAdapter adapter;
-
+
// TODO aconway 2007-07-23: Lots of needless flab here (Channel,
// Connection, ChannelAdapter) As these classes are untangled the
// flab can be reduced. The real requirements are:
@@ -55,8 +54,9 @@
//
BrokerHandler(Broker& broker) :
connection(0, broker),
- channel(connection, *this, 1),
- adapter(channel, connection, broker, *this) {}
+ sessionAdapter(connection, 0),
+ session(sessionAdapter, 1),
+ adapter(session, static_cast<ChannelAdapter&>(*this)) {}
void handle(AMQFrame& frame) {
AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.getBody());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Fri Aug 31 13:51:22 2007
@@ -40,4 +40,7 @@
void qpid::framing::AMQContentBody::print(std::ostream& out) const
{
out << "content (" << size() << " bytes)";
+#ifndef NDEBUG
+ out << data.substr(0,10);
+#endif
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Fri Aug 31 13:51:22 2007
@@ -52,7 +52,10 @@
void setChannel(ChannelId c) { channel = c; }
AMQBody* getBody();
- const AMQBody* getBody() const;
+ const AMQBody* getBody() const;
+
+ AMQMethodBody* getMethod() { return getBody()->getMethod(); }
+ const AMQMethodBody* getMethod() const { return getBody()->getMethod(); }
/** Copy a body instance to the frame */
void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp Fri Aug 31 13:51:22 2007
@@ -36,7 +36,7 @@
ChannelAdapter::ChannelAdapter() : handler(*this), id(0) {}
-void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
+void ChannelAdapter::init(ChannelId i, FrameHandler& out, ProtocolVersion v)
{
assertChannelNotOpen();
id = i;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h Fri Aug 31 13:51:22 2007
@@ -29,13 +29,10 @@
#include "ProtocolVersion.h"
#include "amqp_types.h"
#include "FrameHandler.h"
-#include "OutputHandler.h"
namespace qpid {
namespace framing {
-class OutputHandler;
-
/**
* Base class for client and broker channels.
*
@@ -59,7 +56,7 @@
virtual ~ChannelAdapter() {}
/** Initialize the channel adapter. */
- void init(ChannelId, OutputHandler&, ProtocolVersion);
+ void init(ChannelId, FrameHandler&, ProtocolVersion);
FrameHandler::Chains& getHandlers() { return handlers; }
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Fri Aug 31 13:51:22 2007
@@ -18,6 +18,11 @@
* under the License.
*
*/
+
+// FIXME aconway 2007-08-30: Rewrite as a Session test.
+// There is an issue with the tests use of DeliveryAdapter
+// which is no longer exposed on Session (part of SemanticHandler.)
+//
#include "qpid/broker/BrokerChannel.h"
#include "qpid/broker/BrokerQueue.h"
#include "qpid/broker/FanOutExchange.h"
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Aug 31 13:51:22 2007
@@ -28,8 +28,8 @@
TESTS+=Blob
check_PROGRAMS+=Blob
-Blob_SOURCES=Blob.cpp ../qpid/framing/Blob.cpp
-Blob_LDADD=-lboost_unit_test_framework
+Blob_SOURCES=Blob.cpp
+Blob_LDADD=-lboost_unit_test_framework $(lib_common)
TESTS+=logging
check_PROGRAMS+=logging
@@ -80,7 +80,6 @@
# Unit tests
broker_unit_tests = \
AccumulatedAckTest \
- BrokerChannelTest \
DtxWorkRecordTest \
ExchangeTest \
HeadersExchangeTest \