You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/09/21 20:26:39 UTC
svn commit: r578219 - in /incubator/qpid/trunk/qpid/cpp: rubygen/ src/
src/qpid/broker/
Author: aconway
Date: Fri Sep 21 11:26:37 2007
New Revision: 578219
URL: http://svn.apache.org/viewvc?rev=578219&view=rev
Log:
Split broker::Session into:
broker::SessionState: session info (uuid etc.) + handler chains.
broker::SemanticState: session state for the SemanticHandler.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
- copied, changed from r577734, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
- copied, changed from r577734, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (with props)
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/generate
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/generate
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/generate?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/generate (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/generate Fri Sep 21 11:26:37 2007
@@ -49,9 +49,9 @@
rgen_generator=#{make_continue rgen_generator}
-rgen_client_cpp=#{make_continue(rgen_srcs.grep %r|/qpid/client/.+\.cpp$|)}
+rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))}
-rgen_common_cpp=#{make_continue(rgen_srcs.grep %r|qpid/framing/.+\.cpp$|)}
+rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))}
rgen_srcs=#{make_continue rgen_srcs}
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 21 11:26:37 2007
@@ -187,8 +187,10 @@
qpid/broker/RecoveryManagerImpl.cpp \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
- qpid/broker/Session.h \
- qpid/broker/Session.cpp \
+ qpid/broker/SemanticState.h \
+ qpid/broker/SemanticState.cpp \
+ qpid/broker/SessionState.h \
+ qpid/broker/SessionState.cpp \
qpid/broker/SessionHandler.h \
qpid/broker/SessionHandler.cpp \
qpid/broker/SemanticHandler.cpp \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Fri Sep 21 11:26:37 2007
@@ -16,8 +16,6 @@
*
*/
#include "BrokerAdapter.h"
-#include "Session.h"
-#include "SessionHandler.h"
#include "Connection.h"
#include "DeliveryToken.h"
#include "MessageDelivery.h"
@@ -38,7 +36,7 @@
// by the handlers responsible for those classes.
//
-BrokerAdapter::BrokerAdapter(Session& s) :
+BrokerAdapter::BrokerAdapter(SemanticState& s) :
HandlerImpl(s),
basicHandler(s),
exchangeHandler(s),
@@ -153,7 +151,7 @@
QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = getSession().getQueue(name);
+ Queue::shared_ptr queue = state.getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return QueueQueryResult(queue->getName(),
@@ -176,7 +174,7 @@
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getSession().getQueue(name);
+ queue = state.getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
@@ -187,7 +185,7 @@
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- getSession().setDefaultQueue(queue);
+ state.setDefaultQueue(queue);
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -216,7 +214,7 @@
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -239,7 +237,7 @@
const string& routingKey,
const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
@@ -252,12 +250,12 @@
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- getSession().getQueue(queue)->purge();
+ state.getQueue(queue)->purge();
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = getSession().getQueue(queue);
+ Queue::shared_ptr q = state.getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -279,8 +277,8 @@
void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
- getSession().setPrefetchSize(prefetchSize);
- getSession().setPrefetchCount(prefetchCount);
+ state.setPrefetchSize(prefetchSize);
+ state.setPrefetchCount(prefetchCount);
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -289,8 +287,8 @@
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
- if(!consumerTag.empty() && getSession().exists(consumerTag)){
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!consumerTag.empty() && state.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
string newTag = consumerTag;
@@ -298,7 +296,7 @@
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
+ state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
if(!nowait)
getProxy().getBasic().consumeOk(newTag);
@@ -308,13 +306,13 @@
}
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- getSession().cancel(consumerTag);
+ state.cancel(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!getSession().get(token, queue, !noAck)){
+ if(!state.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
getProxy().getBasic().getEmpty(clusterId);
@@ -323,9 +321,9 @@
void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
if (multiple) {
- getSession().ackCumulative(deliveryTag);
+ state.ackCumulative(deliveryTag);
} else {
- getSession().ackRange(deliveryTag, deliveryTag);
+ state.ackRange(deliveryTag, deliveryTag);
}
}
@@ -333,23 +331,23 @@
void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
{
- getSession().recover(requeue);
+ state.recover(requeue);
}
void BrokerAdapter::TxHandlerImpl::select()
{
- getSession().startTx();
+ state.startTx();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
- getSession().commit(&getBroker().getStore());
+ state.commit(&getBroker().getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
- getSession().rollback();
- getSession().recover(false);
+ state.rollback();
+ state.recover(false);
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Fri Sep 21 11:26:37 2007
@@ -20,7 +20,7 @@
*/
#include "DtxHandlerImpl.h"
#include "MessageHandlerImpl.h"
-#include "NameGenerator.h"
+
#include "qpid/Exception.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/reply_exceptions.h"
@@ -44,6 +44,7 @@
class DtxHandler;
class TunnelHandler;
class MessageHandlerImpl;
+class Exchange;
/**
* Per-channel protocol adapter.
@@ -54,16 +55,10 @@
* peer.
*
*/
-
-// TODO aconway 2007-09-18: BrokerAdapter is no longer an appropriate way
-// to group methods as seen by the BADHANDLERs below.
-// Handlers should be grouped by layer, the BrokerAdapter stuff
-// belongs on the SemanticHandler.
-//
class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(Session& session);
+ BrokerAdapter(SemanticState& session);
BasicHandler* getBasicHandler() { return &basicHandler; }
ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
@@ -73,7 +68,8 @@
MessageHandler* getMessageHandler() { return &messageHandler; }
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
- framing::ProtocolVersion getVersion() const { return getConnection().getVersion(); }
+
+ framing::ProtocolVersion getVersion() const { return session.getVersion();}
AccessHandler* getAccessHandler() {
@@ -99,7 +95,7 @@
public HandlerImpl
{
public:
- ExchangeHandlerImpl(Session& session) : HandlerImpl(session) {}
+ ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
void declare(uint16_t ticket,
const std::string& exchange, const std::string& type,
@@ -108,10 +104,13 @@
const qpid::framing::FieldTable& arguments);
void delete_(uint16_t ticket,
const std::string& exchange, bool ifUnused);
- framing::ExchangeQueryResult query(u_int16_t ticket, const string& name);
+ framing::ExchangeQueryResult query(u_int16_t ticket,
+ const std::string& name);
private:
- void checkType(Exchange::shared_ptr exchange, const std::string& type);
- void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate);
+ void checkType(shared_ptr<Exchange> exchange, const std::string& type);
+
+ void checkAlternate(shared_ptr<Exchange> exchange,
+ shared_ptr<Exchange> alternate);
};
class BindingHandlerImpl :
@@ -119,7 +118,7 @@
public HandlerImpl
{
public:
- BindingHandlerImpl(Session& session) : HandlerImpl(session) {}
+ BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
framing::BindingQueryResult query(u_int16_t ticket,
const std::string& exchange,
@@ -133,7 +132,7 @@
public HandlerImpl
{
public:
- QueueHandlerImpl(Session& session) : HandlerImpl(session) {}
+ QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
void declare(uint16_t ticket, const std::string& queue,
const std::string& alternateExchange,
@@ -148,7 +147,7 @@
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- framing::QueueQueryResult query(const string& queue);
+ framing::QueueQueryResult query(const std::string& queue);
void purge(uint16_t ticket, const std::string& queue);
void delete_(uint16_t ticket, const std::string& queue,
bool ifUnused, bool ifEmpty);
@@ -159,9 +158,8 @@
public HandlerImpl
{
NameGenerator tagGenerator;
-
public:
- BasicHandlerImpl(Session& session) : HandlerImpl(session), tagGenerator("sgen") {}
+ BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {}
void qos(uint32_t prefetchSize,
uint16_t prefetchCount, bool global);
@@ -181,7 +179,7 @@
public HandlerImpl
{
public:
- TxHandlerImpl(Session& session) : HandlerImpl(session) {}
+ TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
void select();
void commit();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Sep 21 11:26:37 2007
@@ -23,7 +23,7 @@
#include <assert.h>
#include "Connection.h"
-#include "Session.h"
+#include "SessionState.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "BrokerAdapter.h"
#include "SemanticHandler.h"
@@ -52,8 +52,7 @@
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
- SessionHandler sa = getChannel(frame.getChannel());
- sa.in(frame);
+ getChannel(frame.getChannel()).in(frame);
}
}
@@ -94,7 +93,7 @@
if (i != channels.end()) channels.erase(i);
}
-SessionHandler Connection::getChannel(ChannelId id) {
+SessionHandler& Connection::getChannel(ChannelId id) {
boost::optional<SessionHandler>& ch = channels[id];
if (!ch) {
ch = boost::in_place(boost::ref(*this), id);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Sep 21 11:26:37 2007
@@ -35,7 +35,6 @@
#include "qpid/framing/ProtocolVersion.h"
#include "Broker.h"
#include "qpid/Exception.h"
-#include "Session.h"
#include "ConnectionHandler.h"
#include "SessionHandler.h"
@@ -51,7 +50,7 @@
Connection(sys::ConnectionOutputHandler* out, Broker& broker);
/** Get the SessionHandler for channel. Create if it does not already exist */
- SessionHandler getChannel(framing::ChannelId channel);
+ SessionHandler& getChannel(framing::ChannelId channel);
/** Close the connection */
void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Sep 21 11:26:37 2007
@@ -20,7 +20,7 @@
*/
#include "DeliveryRecord.h"
#include "DeliverableMessage.h"
-#include "Session.h"
+#include "SemanticState.h"
#include "BrokerExchange.h"
#include "qpid/log/Statement.h"
@@ -74,7 +74,7 @@
return range->covers(id);
}
-void DeliveryRecord::redeliver(Session* const session) const{
+void DeliveryRecord::redeliver(SemanticState* const session) const{
if (!confirmed) {
if(pull){
//if message was originally sent as response to get, we must requeue it
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Sep 21 11:26:37 2007
@@ -34,7 +34,7 @@
namespace qpid {
namespace broker {
-class Session;
+class SemanticState;
/**
* Record of a delivery for which an ack is outstanding.
@@ -61,7 +61,7 @@
void requeue() const;
void release();
void reject();
- void redeliver(Session* const) const;
+ void redeliver(SemanticState* const) const;
void updateByteCredit(uint32_t& credit) const;
void addTo(Prefetch&) const;
void subtractFrom(Prefetch&) const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Sep 21 11:26:37 2007
@@ -19,21 +19,20 @@
#include <boost/format.hpp>
#include "Broker.h"
-#include "Session.h"
#include "qpid/framing/constants.h"
using namespace qpid::broker;
using namespace qpid::framing;
using std::string;
-DtxHandlerImpl::DtxHandlerImpl(Session& s) : HandlerImpl(s) {}
+DtxHandlerImpl::DtxHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
// DtxDemarcationHandler:
void DtxHandlerImpl::select()
{
- getSession().selectDtx();
+ state.selectDtx();
}
DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -43,7 +42,7 @@
{
try {
if (fail) {
- getSession().endDtx(xid, true);
+ state.endDtx(xid, true);
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
@@ -51,9 +50,9 @@
}
} else {
if (suspend) {
- getSession().suspendDtx(xid);
+ state.suspendDtx(xid);
} else {
- getSession().endDtx(xid, false);
+ state.endDtx(xid, false);
}
return DtxDemarcationEndResult(XA_OK);
}
@@ -72,9 +71,9 @@
}
try {
if (resume) {
- getSession().resumeDtx(xid);
+ state.resumeDtx(xid);
} else {
- getSession().startDtx(xid, getBroker().getDtxManager(), join);
+ state.startDtx(xid, getBroker().getDtxManager(), join);
}
return DtxDemarcationStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h Fri Sep 21 11:26:37 2007
@@ -32,7 +32,7 @@
public framing::AMQP_ServerOperations::DtxDemarcationHandler
{
public:
- DtxHandlerImpl(Session&);
+ DtxHandlerImpl(SemanticState&);
// DtxCoordinationHandler:
@@ -57,8 +57,6 @@
void select();
framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
-
-
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Fri Sep 21 11:26:37 2007
@@ -19,9 +19,8 @@
*
*/
-#include "Session.h"
-#include "SessionHandler.h"
-#include "Connection.h"
+#include "SemanticState.h"
+#include "SessionState.h"
namespace qpid {
namespace broker {
@@ -34,26 +33,14 @@
*/
class HandlerImpl {
protected:
- HandlerImpl(Session& s) : session(s) {}
+ SemanticState& state;
+ SessionState& session;
- Session& getSession() { return session; }
- const Session& getSession() const { return session; }
-
- SessionHandler* getSessionHandler() { return session.getHandler(); }
- const SessionHandler* getSessionHandler() const { return session.getHandler(); }
+ HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
- // Remaining functions may only be called if getSessionHandler() != 0
- framing::AMQP_ClientProxy& getProxy() { return getSessionHandler()->getProxy(); }
- const framing::AMQP_ClientProxy& getProxy() const { return getSessionHandler()->getProxy(); }
-
- Connection& getConnection() { return getSessionHandler()->getConnection(); }
- const Connection& getConnection() const { return getSessionHandler()->getConnection(); }
-
- Broker& getBroker() { return getConnection().broker; }
- const Broker& getBroker() const { return getConnection().broker; }
-
- private:
- Session& session;
+ framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
+ Connection& getConnection() { return session.getConnection(); }
+ Broker& getBroker() { return session.getBroker(); }
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Fri Sep 21 11:26:37 2007
@@ -18,7 +18,6 @@
#include "qpid/QpidError.h"
#include "MessageHandlerImpl.h"
-#include "Session.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
#include "Broker.h"
@@ -36,8 +35,7 @@
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(Session& session)
- : HandlerImpl(session) {}
+MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
//
// Message class method handlers
@@ -46,7 +44,7 @@
void
MessageHandlerImpl::cancel(const string& destination )
{
- getSession().cancel(destination);
+ state.cancel(destination);
}
void
@@ -97,14 +95,14 @@
bool exclusive,
const framing::FieldTable& filter )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
- if(!destination.empty() && getSession().exists(destination))
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!destination.empty() && state.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
//NB: am assuming pre-acquired = 0 as discussed on SIG list as is
//the previously expected behaviour
- getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
@@ -117,9 +115,9 @@
const string& destination,
bool noAck )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
- if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+ if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -148,14 +146,14 @@
bool /*global*/ )
{
//TODO: handle global
- getSession().setPrefetchSize(prefetchSize);
- getSession().setPrefetchCount(prefetchCount);
+ state.setPrefetchSize(prefetchSize);
+ state.setPrefetchCount(prefetchCount);
}
void
MessageHandlerImpl::recover(bool requeue)
{
- getSession().recover(requeue);
+ state.recover(requeue);
}
void
@@ -166,7 +164,7 @@
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().reject(i->getValue(), (++i)->getValue());
+ state.reject(i->getValue(), (++i)->getValue());
}
}
@@ -175,10 +173,10 @@
if (unit == 0) {
//message
- getSession().addMessageCredit(destination, value);
+ state.addMessageCredit(destination, value);
} else if (unit == 1) {
//bytes
- getSession().addByteCredit(destination, value);
+ state.addByteCredit(destination, value);
} else {
//unknown
throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -190,10 +188,10 @@
{
if (mode == 0) {
//credit
- getSession().setCreditMode(destination);
+ state.setCreditMode(destination);
} else if (mode == 1) {
//window
- getSession().setWindowMode(destination);
+ state.setWindowMode(destination);
} else{
throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
}
@@ -201,12 +199,12 @@
void MessageHandlerImpl::flush(const std::string& destination)
{
- getSession().flush(destination);
+ state.flush(destination);
}
void MessageHandlerImpl::stop(const std::string& destination)
{
- getSession().stop(destination);
+ state.stop(destination);
}
void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
@@ -218,7 +216,7 @@
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().acquire(i->getValue(), (++i)->getValue(), results);
+ state.acquire(i->getValue(), (++i)->getValue(), results);
}
results = results.condense();
@@ -232,7 +230,7 @@
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().release(i->getValue(), (++i)->getValue());
+ state.release(i->getValue(), (++i)->getValue());
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h Fri Sep 21 11:26:37 2007
@@ -37,7 +37,7 @@
public HandlerImpl
{
public:
- MessageHandlerImpl(Session&);
+ MessageHandlerImpl(SemanticState&);
void append(const std::string& reference, const std::string& bytes);
@@ -87,8 +87,8 @@
void release(const framing::SequenceNumberSet& transfers);
void subscribe(u_int16_t ticket,
- const string& queue,
- const string& destination,
+ const std::string& queue,
+ const std::string& destination,
bool noLocal,
u_int8_t confirmMode,
u_int8_t acquireMode,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Sep 21 11:26:37 2007
@@ -20,12 +20,12 @@
*/
#include "SemanticHandler.h"
-#include "Session.h"
+#include "SemanticState.h"
#include "SessionHandler.h"
+#include "SessionState.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
#include "Connection.h"
-#include "Session.h"
#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ExecutionResultBody.h"
#include "qpid/framing/InvocationVisitor.h"
@@ -36,7 +36,7 @@
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {}
+SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {}
void SemanticHandler::handle(framing::AMQFrame& frame)
{
@@ -79,13 +79,13 @@
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- getSession().ackCumulative(mark.getValue());
+ state.ackCumulative(mark.getValue());
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
- getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
@@ -95,9 +95,9 @@
SequenceNumber mark = incoming.getMark();
SequenceNumberSet range = incoming.getRange();
Mutex::ScopedLock l(outLock);
- assert(getSessionHandler());
- getProxy().getExecution().complete(mark.getValue(), range);
+ session.getProxy().getExecution().complete(mark.getValue(), range);
}
+
void SemanticHandler::flush()
{
incoming.flush();
@@ -122,7 +122,7 @@
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
SequenceNumber id = incoming.next();
- BrokerAdapter adapter(getSession());
+ BrokerAdapter adapter(state);
InvocationVisitor v(&adapter);
method->accept(v);
incoming.complete(id);
@@ -130,7 +130,7 @@
if (!v.wasHandled()) {
throw ConnectionException(540, "Not implemented");
} else if (v.hasResult()) {
- getProxy().getExecution().result(id.getValue(), v.getResult());
+ session.getProxy().getExecution().result(id.getValue(), v.getResult());
}
//TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
//TODO: if window gets too large send unsolicited completion
@@ -152,8 +152,8 @@
}
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
- msg->setPublisher(&getConnection());
- getSession().handle(msg);
+ msg->setPublisher(&session.getConnection());
+ state.handle(msg);
msgBuilder.end();
incoming.track(msg);
//TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
@@ -163,13 +163,17 @@
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax());
+ MessageDelivery::deliver(
+ msg, session.getHandler().out,
+ ++outgoing.hwm, token,
+ session.getConnection().getFrameMax());
return outgoing.hwm;
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax());
+ MessageDelivery::deliver(msg, session.getHandler().out, tag, token,
+ session.getConnection().getFrameMax());
}
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Fri Sep 21 11:26:37 2007
@@ -44,13 +44,17 @@
namespace broker {
-class Session;
+class SessionState;
class SemanticHandler : public DeliveryAdapter,
- public framing::FrameHandler,
- public framing::AMQP_ServerOperations::ExecutionHandler,
- private HandlerImpl
+ public framing::FrameHandler,
+ public framing::AMQP_ServerOperations::ExecutionHandler
+
{
+ SemanticState state;
+ SessionState& session;
+ // FIXME aconway 2007-09-20: Why are these on the handler rather than the
+ // state?
IncomingExecutionContext incoming;
framing::Window outgoing;
sys::Mutex outLock;
@@ -69,8 +73,12 @@
DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
+ framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
+ Connection& getConnection() { return session.getConnection(); }
+ Broker& getBroker() { return session.getBroker(); }
+
public:
- SemanticHandler(Session& session);
+ SemanticHandler(SessionState& session);
//frame handler:
void handle(framing::AMQFrame& frame);
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (from r577734, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp&r1=577734&r2=578219&rev=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Sep 21 11:26:37 2007
@@ -19,8 +19,7 @@
*
*/
-#include "Session.h"
-
+#include "SessionState.h"
#include "BrokerAdapter.h"
#include "BrokerQueue.h"
#include "Connection.h"
@@ -56,11 +55,9 @@
using namespace qpid::framing;
using namespace qpid::sys;
-Session::Session(SessionHandler& a, uint32_t t)
- : adapter(&a),
- broker(adapter->getConnection().broker),
- timeout(t),
- id(true),
+SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
+ : session(ss),
+ deliveryAdapter(da),
prefetchSize(0),
prefetchCount(0),
tagGenerator("sgen"),
@@ -69,28 +66,21 @@
flowActive(true)
{
outstanding.reset();
- std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
- // FIXME aconway 2007-08-29: move deliveryAdapter to SemanticHandlerState.
- deliveryAdapter=semantic.get();
- handlers.push_back(semantic.release());
- in = &handlers[0];
- out = &adapter->out;
- // FIXME aconway 2007-08-31: handlerupdater->sessionupdater,
- // create a SessionManager in the broker for all session related
- // stuff: suspended sessions, handler updaters etc.
- // FIXME aconway 2007-08-31: Shouldn't be passing channel ID
- broker.update(a.getChannel(), *this);
}
-Session::~Session() {
- close();
+SemanticState::~SemanticState() {
+ consumers.clear();
+ if (dtxBuffer.get()) {
+ dtxBuffer->fail();
+ }
+ recover(true);
}
-bool Session::exists(const string& consumerTag){
+bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
+void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
bool exclusive, const FieldTable*)
{
@@ -101,7 +91,7 @@
consumers.insert(tagInOut, c.release());
}
-void Session::cancel(const string& tag){
+void SemanticState::cancel(const string& tag){
// consumers is a ptr_map so erase will delete the consumer
// which will call cancel.
ConsumerImplMap::iterator i = consumers.find(tag);
@@ -109,22 +99,13 @@
consumers.erase(i);
}
-void Session::close()
-{
- opened = false;
- consumers.clear();
- if (dtxBuffer.get()) {
- dtxBuffer->fail();
- }
- recover(true);
-}
-void Session::startTx()
+void SemanticState::startTx()
{
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void Session::commit(MessageStore* const store)
+void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
@@ -135,7 +116,7 @@
}
}
-void Session::rollback()
+void SemanticState::rollback()
{
if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
@@ -143,12 +124,12 @@
accumulatedAck.clear();
}
-void Session::selectDtx()
+void SemanticState::selectDtx()
{
dtxSelected = true;
}
-void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
{
if (!dtxSelected) {
throw ConnectionException(503, "Session has not been selected for use with dtx");
@@ -162,7 +143,7 @@
}
}
-void Session::endDtx(const std::string& xid, bool fail)
+void SemanticState::endDtx(const std::string& xid, bool fail)
{
if (!dtxBuffer) {
throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
@@ -183,7 +164,7 @@
dtxBuffer.reset();
}
-void Session::suspendDtx(const std::string& xid)
+void SemanticState::suspendDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
@@ -195,7 +176,7 @@
dtxBuffer->setSuspended(true);
}
-void Session::resumeDtx(const std::string& xid)
+void SemanticState::resumeDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
@@ -210,7 +191,7 @@
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
}
-void Session::checkDtxTimeout()
+void SemanticState::checkDtxTimeout()
{
if (dtxBuffer->isExpired()) {
dtxBuffer.reset();
@@ -218,13 +199,13 @@
}
}
-void Session::record(const DeliveryRecord& delivery)
+void SemanticState::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
delivery.addTo(outstanding);
}
-bool Session::checkPrefetch(Message::shared_ptr& msg)
+bool SemanticState::checkPrefetch(Message::shared_ptr& msg)
{
Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
@@ -232,7 +213,7 @@
return countOk && sizeOk;
}
-Session::ConsumerImpl::ConsumerImpl(Session* _parent,
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
DeliveryToken::shared_ptr _token,
const string& _name,
Queue::shared_ptr _queue,
@@ -253,9 +234,10 @@
msgCredit(0),
byteCredit(0) {}
-bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
+bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) {
+ if (nolocal &&
+ &parent->getSession().getConnection() == msg.payload->getPublisher()) {
return false;
} else {
if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
@@ -266,7 +248,7 @@
Mutex::ScopedLock locker(parent->deliveryLock);
DeliveryId deliveryTag =
- parent->deliveryAdapter->deliver(msg.payload, token);
+ parent->deliveryAdapter.deliver(msg.payload, token);
if (windowing || ackExpected) {
parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
}
@@ -275,7 +257,7 @@
}
}
-bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
{
Mutex::ScopedLock l(lock);
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
@@ -291,33 +273,34 @@
}
}
-void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
+void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
Mutex::ScopedLock locker(parent->deliveryLock);
- parent->deliveryAdapter->redeliver(msg, token, deliveryTag);
+ parent->deliveryAdapter.redeliver(msg, token, deliveryTag);
}
-Session::ConsumerImpl::~ConsumerImpl() {
+SemanticState::ConsumerImpl::~ConsumerImpl() {
cancel();
}
-void Session::ConsumerImpl::cancel()
+void SemanticState::ConsumerImpl::cancel()
{
if(queue) {
queue->cancel(this);
if (queue->canAutoDelete()) {
- parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(),
- boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+ parent->getSession().getBroker().getQueues().destroyIf(
+ queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
}
}
}
-void Session::ConsumerImpl::requestDispatch()
+void SemanticState::ConsumerImpl::requestDispatch()
{
if(blocked)
queue->requestDispatch(this);
}
-void Session::handle(Message::shared_ptr msg) {
+void SemanticState::handle(Message::shared_ptr msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
@@ -329,10 +312,10 @@
}
}
-void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
+void SemanticState::route(Message::shared_ptr msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName){
- cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName);
+ cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName);
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -347,17 +330,17 @@
}
-void Session::ackCumulative(DeliveryId id)
+void SemanticState::ackCumulative(DeliveryId id)
{
ack(id, id, true);
}
-void Session::ackRange(DeliveryId first, DeliveryId last)
+void SemanticState::ackRange(DeliveryId first, DeliveryId last)
{
ack(first, last, false);
}
-void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
+void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
@@ -373,7 +356,7 @@
++end;
}
- for_each(start, end, boost::bind(&Session::acknowledged, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -398,7 +381,7 @@
for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
}
-void Session::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::acknowledged(const DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
@@ -407,7 +390,7 @@
}
}
-void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
{
if (windowing) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
@@ -415,7 +398,7 @@
}
}
-void Session::recover(bool requeue)
+void SemanticState::recover(bool requeue)
{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
@@ -431,12 +414,12 @@
}
}
-bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
{
QueuedMessage msg = queue->dequeue();
if(msg.payload){
Mutex::ScopedLock locker(deliveryLock);
- DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token);
+ DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg.payload, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -446,7 +429,7 @@
}
}
-void Session::deliver(Message::shared_ptr& msg, const string& consumerTag,
+void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag,
DeliveryId deliveryTag)
{
ConsumerImplMap::iterator i = consumers.find(consumerTag);
@@ -455,7 +438,7 @@
}
}
-void Session::flow(bool active)
+void SemanticState::flow(bool active)
{
Mutex::ScopedLock locker(deliveryLock);
bool requestDelivery(!flowActive && active);
@@ -467,7 +450,7 @@
}
-Session::ConsumerImpl& Session::find(const std::string& destination)
+SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
{
ConsumerImplMap::iterator i = consumers.find(destination);
if (i == consumers.end()) {
@@ -477,62 +460,62 @@
}
}
-void Session::setWindowMode(const std::string& destination)
+void SemanticState::setWindowMode(const std::string& destination)
{
find(destination).setWindowMode();
}
-void Session::setCreditMode(const std::string& destination)
+void SemanticState::setCreditMode(const std::string& destination)
{
find(destination).setCreditMode();
}
-void Session::addByteCredit(const std::string& destination, uint32_t value)
+void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
find(destination).addByteCredit(value);
}
-void Session::addMessageCredit(const std::string& destination, uint32_t value)
+void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
find(destination).addMessageCredit(value);
}
-void Session::flush(const std::string& destination)
+void SemanticState::flush(const std::string& destination)
{
ConsumerImpl& c = find(destination);
c.flush();
}
-void Session::stop(const std::string& destination)
+void SemanticState::stop(const std::string& destination)
{
find(destination).stop();
}
-void Session::ConsumerImpl::setWindowMode()
+void SemanticState::ConsumerImpl::setWindowMode()
{
windowing = true;
}
-void Session::ConsumerImpl::setCreditMode()
+void SemanticState::ConsumerImpl::setCreditMode()
{
windowing = false;
}
-void Session::ConsumerImpl::addByteCredit(uint32_t value)
+void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
byteCredit += value;
requestDispatch();
}
-void Session::ConsumerImpl::addMessageCredit(uint32_t value)
+void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
msgCredit += value;
requestDispatch();
}
-void Session::ConsumerImpl::flush()
+void SemanticState::ConsumerImpl::flush()
{
//need to prevent delivery after requestDispatch returns but
//before credit is reduced to zero; TODO: come up with better
@@ -543,13 +526,13 @@
msgCredit = 0;
}
-void Session::ConsumerImpl::stop()
+void SemanticState::ConsumerImpl::stop()
{
msgCredit = 0;
byteCredit = 0;
}
-Queue::shared_ptr Session::getQueue(const string& name) const {
+Queue::shared_ptr SemanticState::getQueue(const string& name) const {
//Note: this can be removed soon as the default queue for sessions is scrapped in 0-10
Queue::shared_ptr queue;
if (name.empty()) {
@@ -558,14 +541,14 @@
throw NotAllowedException(QPID_MSG("No queue name specified."));
}
else {
- queue = getBroker().getQueues().find(name);
+ queue = session.getBroker().getQueues().find(name);
if (!queue)
throw NotFoundException(QPID_MSG("Queue not found: "<<name));
}
return queue;
}
-AckRange Session::findRange(DeliveryId first, DeliveryId last)
+AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
{
ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
ack_iterator end = start;
@@ -582,21 +565,21 @@
return AckRange(start, end);
}
-void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
+void SemanticState::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
{
Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
for_each(range.start, range.end, AcquireFunctor(acquired));
}
-void Session::release(DeliveryId first, DeliveryId last)
+void SemanticState::release(DeliveryId first, DeliveryId last)
{
Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
}
-void Session::reject(DeliveryId first, DeliveryId last)
+void SemanticState::reject(DeliveryId first, DeliveryId last)
{
Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (from r577734, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h&r1=577734&r2=578219&rev=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Sep 21 11:26:37 2007
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_SESSION_H
-#define QPID_BROKER_SESSION_H
+#ifndef QPID_BROKER_SEMANTICSTATE_H
+#define QPID_BROKER_SEMANTICSTATE_H
/*
*
@@ -37,7 +37,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/shared_ptr.h"
-#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/ptr_container/ptr_map.hpp>
#include <list>
#include <vector>
@@ -45,21 +45,19 @@
namespace qpid {
namespace broker {
-class SessionHandler;
-class Broker;
+class SessionState;
/**
- * Session holds the state of an open session, whether attached to a
- * channel or suspended. It also holds the handler chains associated
- * with the session.
+ * SemanticState holds the L3 and L4 state of an open session, whether
+ * attached to a channel or suspended.
*/
-class Session : public framing::FrameHandler::Chains,
- private boost::noncopyable
+class SemanticState : public framing::FrameHandler::Chains,
+ private boost::noncopyable
{
class ConsumerImpl : public Consumer
{
sys::Mutex lock;
- Session* const parent;
+ SemanticState* const parent;
const DeliveryToken::shared_ptr token;
const string name;
const Queue::shared_ptr queue;
@@ -74,7 +72,7 @@
bool checkCredit(Message::shared_ptr& msg);
public:
- ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token,
+ ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
@@ -94,13 +92,8 @@
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
- SessionHandler* adapter;
- Broker& broker;
- uint32_t timeout;
- framing::Uuid id;
- boost::ptr_vector<framing::FrameHandler> handlers;
-
- DeliveryAdapter* deliveryAdapter;
+ SessionState& session;
+ DeliveryAdapter& deliveryAdapter;
Queue::shared_ptr defaultQueue;
ConsumerImplMap consumers;
uint32_t prefetchSize;
@@ -113,7 +106,6 @@
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
framing::AccumulatedAck accumulatedAck;
- bool opened;
bool flowActive;
boost::shared_ptr<Exchange> cacheExchange;
@@ -128,19 +120,10 @@
AckRange findRange(DeliveryId first, DeliveryId last);
public:
- Session(SessionHandler&, uint32_t timeout);
- ~Session();
-
- /** Returns 0 if this session is not currently attached */
- SessionHandler* getHandler() { return adapter; }
- const SessionHandler* getHandler() const { return adapter; }
+ SemanticState(DeliveryAdapter&, SessionState&);
+ ~SemanticState();
- Broker& getBroker() const { return broker; }
-
- /** Session timeout, aka detached-lifetime. */
- uint32_t getTimeout() const { return timeout; }
- /** Session ID */
- const framing::Uuid& getId() const { return id; }
+ SessionState& getSession() { return session; }
/**
* Get named queue, never returns 0.
@@ -174,7 +157,6 @@
void stop(const std::string& destination);
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
- void close();
void startTx();
void commit(MessageStore* const store);
void rollback();
@@ -198,4 +180,5 @@
-#endif /*!QPID_BROKER_SESSION_H*/
+
+#endif /*!QPID_BROKER_SEMANTICSTATE_H*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Sep 21 11:26:37 2007
@@ -19,7 +19,7 @@
*/
#include "SessionHandler.h"
-#include "Session.h"
+#include "SessionState.h"
#include "Connection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
@@ -94,7 +94,7 @@
void SessionHandler::open(uint32_t detachedLifetime) {
assertClosed("open");
- session.reset(new Session(*this, detachedLifetime));
+ session.reset(new SessionState(*this, detachedLifetime));
getProxy().getSession().attached(session->getId(), session->getTimeout());
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Sep 21 11:26:37 2007
@@ -31,7 +31,7 @@
namespace broker {
class Connection;
-class Session;
+class SessionState;
/**
* A SessionHandler is associated with each active channel. It
@@ -48,8 +48,8 @@
~SessionHandler();
/** Returns 0 if not attached to a session */
- Session* getSession() { return session.get(); }
- const Session* getSession() const { return session.get(); }
+ SessionState* getSession() { return session.get(); }
+ const SessionState* getSession() const { return session.get(); }
framing::ChannelId getChannel() const { return channel; }
@@ -84,7 +84,7 @@
Connection& connection;
const framing::ChannelId channel;
framing::AMQP_ClientProxy proxy;
- shared_ptr<Session> session;
+ shared_ptr<SessionState> session;
bool ignoring;
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=578219&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Sep 21 11:26:37 2007
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionState.h"
+#include "SessionHandler.h"
+#include "Connection.h"
+#include "Broker.h"
+#include "SemanticHandler.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+
+SessionState::SessionState(SessionHandler& h, uint32_t timeout_)
+ : handler(&h), id(true), timeout(timeout_),
+ broker(h.getConnection().broker),
+ version(h.getConnection().getVersion())
+{
+ // FIXME aconway 2007-09-21: Break dependnecy - broker updates session.
+ chain.push_back(new SemanticHandler(*this));
+ in = &chain[0]; // Incoming frame to handler chain.
+ out = &handler->out; // Outgoing frames to SessionHandler
+
+ // FIXME aconway 2007-09-20: use broker to add plugin
+ // handlers to the chain.
+ // FIXME aconway 2007-08-31: Shouldn't be passing channel ID.
+ broker.update(handler->getChannel(), *this);
+}
+
+SessionHandler& SessionState::getHandler() {
+ assert(isAttached());
+ return *handler;
+}
+
+AMQP_ClientProxy& SessionState::getProxy() {
+ return getHandler().getProxy();
+}
+ /** Convenience for: getHandler()->getConnection()
+ *@pre getHandler() != 0
+ */
+Connection& SessionState::getConnection() {
+ return getHandler().getConnection();
+}
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Sep 21 11:26:37 2007
@@ -23,44 +23,73 @@
*/
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/noncopyable.hpp>
+
namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
namespace broker {
+class SessionHandler;
+class Broker;
+class Connection;
+
/**
* State of a session.
+ *
+ * An attached session has a SessionHandler which is attached to a
+ * connection. A suspended session has no handler.
+ *
+ * A SessionState is always associated with an open session (attached or
+ * suspended) it is destroyed when the session is closed.
+ *
+ * The SessionState includes the sessions handler chains, which may
+ * themselves have state. The handlers will be preserved as long as
+ * the session is alive.
*/
-class SessionState
+class SessionState : public framing::FrameHandler::Chains,
+ private boost::noncopyable
{
public:
- enum State { CLOSED, ACTIVE, SUSPENDED };
+ /** SessionState for a newly opened connection. */
+ SessionState(SessionHandler& h, uint32_t timeout_);
- /** Initially in CLOSED state */
- SessionState() : id(false), state(CLOSED), timeout(0) {}
+ bool isAttached() { return handler; }
- /** Make CLOSED session ACTIVE, assigns a new UUID.
- * #@param timeout in seconds
- */
- void open(u_int32_t timeout_) {
- state=ACTIVE; id.generate(); timeout=timeout_;
- }
+ /** @pre isAttached() */
+ SessionHandler& getHandler();
- /** Close a session. */
- void close() { state=CLOSED; id.clear(); timeout=0; }
+ /** @pre isAttached() */
+ framing::AMQP_ClientProxy& getProxy();
+
+ /** @pre isAttached() */
+ Connection& getConnection();
- State getState() const { return state; }
const framing::Uuid& getId() const { return id; }
uint32_t getTimeout() const { return timeout; }
-
- bool isOpen() { return state == ACTIVE; }
- bool isClosed() { return state == CLOSED; }
- bool isSuspended() { return state == SUSPENDED; }
+ Broker& getBroker() { return broker; }
+ framing::ProtocolVersion getVersion() const { return version; }
+
private:
- friend class SuspendedSessions;
+ friend class SessionHandler; // Only SessionHandler can attach/detach
+ void detach() { handler=0; }
+ void attach(SessionHandler& h) { handler = &h; }
+
+ SessionHandler* handler;
framing::Uuid id;
- State state;
uint32_t timeout;
+ Broker& broker;
+ boost::ptr_vector<framing::FrameHandler> chain;
+ framing::ProtocolVersion version;
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h Fri Sep 21 11:26:37 2007
@@ -31,8 +31,10 @@
namespace qpid {
namespace broker {
-/** Collection of suspended sessions.
- * Thread safe.
+/**
+ * Thread safe collection of suspended sessions.
+ * Every session is owned either by a connection's SessionHandler
+ * or by the SuspendedSessions.
*/
class SuspendedSessions {
typedef std::multimap<sys::AbsTime,SessionState> Map;