You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/09/21 20:26:39 UTC

svn commit: r578219 - in /incubator/qpid/trunk/qpid/cpp: rubygen/ src/ src/qpid/broker/

Author: aconway
Date: Fri Sep 21 11:26:37 2007
New Revision: 578219

URL: http://svn.apache.org/viewvc?rev=578219&view=rev
Log:

Split broker::Session into:
  broker::SessionState: session info (uuid etc.) + handler chains.
  broker::SemanticState: session state for the SemanticHandler.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
      - copied, changed from r577734, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
      - copied, changed from r577734, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/generate
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/generate
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/generate?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/generate (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/generate Fri Sep 21 11:26:37 2007
@@ -49,9 +49,9 @@
 
 rgen_generator=#{make_continue rgen_generator}
 
-rgen_client_cpp=#{make_continue(rgen_srcs.grep %r|/qpid/client/.+\.cpp$|)}
+rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))}
 
-rgen_common_cpp=#{make_continue(rgen_srcs.grep %r|qpid/framing/.+\.cpp$|)}
+rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))}
 
 rgen_srcs=#{make_continue rgen_srcs}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 21 11:26:37 2007
@@ -187,8 +187,10 @@
   qpid/broker/RecoveryManagerImpl.cpp \
   qpid/broker/RecoveredEnqueue.cpp \
   qpid/broker/RecoveredDequeue.cpp \
-  qpid/broker/Session.h \
-  qpid/broker/Session.cpp \
+  qpid/broker/SemanticState.h \
+  qpid/broker/SemanticState.cpp \
+  qpid/broker/SessionState.h \
+  qpid/broker/SessionState.cpp \
   qpid/broker/SessionHandler.h \
   qpid/broker/SessionHandler.cpp \
   qpid/broker/SemanticHandler.cpp \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Fri Sep 21 11:26:37 2007
@@ -16,8 +16,6 @@
  *
  */
 #include "BrokerAdapter.h"
-#include "Session.h"
-#include "SessionHandler.h"
 #include "Connection.h"
 #include "DeliveryToken.h"
 #include "MessageDelivery.h"
@@ -38,7 +36,7 @@
 // by the handlers responsible for those classes.
 //
 
-BrokerAdapter::BrokerAdapter(Session& s) :
+BrokerAdapter::BrokerAdapter(SemanticState& s) :
     HandlerImpl(s),
     basicHandler(s),
     exchangeHandler(s),
@@ -153,7 +151,7 @@
 
 QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
 {
-    Queue::shared_ptr queue = getSession().getQueue(name);
+    Queue::shared_ptr queue = state.getQueue(name);
     Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
 
     return QueueQueryResult(queue->getName(), 
@@ -176,7 +174,7 @@
     }
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
-	queue = getSession().getQueue(name);
+	queue = state.getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
 	std::pair<Queue::shared_ptr, bool> queue_created =  
@@ -187,7 +185,7 @@
 	queue = queue_created.first;
 	assert(queue);
 	if (queue_created.second) { // This is a new queue
-	    getSession().setDefaultQueue(queue);
+	    state.setDefaultQueue(queue);
             if (alternate) {
                 queue->setAlternateExchange(alternate);
                 alternate->incAlternateUsers();
@@ -216,7 +214,7 @@
                                            const string& exchangeName, const string& routingKey, 
                                            const FieldTable& arguments){
 
-    Queue::shared_ptr queue = getSession().getQueue(queueName);
+    Queue::shared_ptr queue = state.getQueue(queueName);
     Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
     if(exchange){
         string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -239,7 +237,7 @@
                                         const string& routingKey,
                                         const qpid::framing::FieldTable& arguments )
 {
-    Queue::shared_ptr queue = getSession().getQueue(queueName);
+    Queue::shared_ptr queue = state.getQueue(queueName);
     if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
 
     Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
@@ -252,12 +250,12 @@
 }
         
 void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
-    getSession().getQueue(queue)->purge();
+    state.getQueue(queue)->purge();
 } 
         
 void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
     ChannelException error(0, "");
-    Queue::shared_ptr q = getSession().getQueue(queue);
+    Queue::shared_ptr q = state.getQueue(queue);
     if(ifEmpty && q->getMessageCount() > 0){
         throw PreconditionFailedException("Queue not empty.");
     }else if(ifUnused && q->getConsumerCount() > 0){
@@ -279,8 +277,8 @@
 
 void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
     //TODO: handle global
-    getSession().setPrefetchSize(prefetchSize);
-    getSession().setPrefetchCount(prefetchCount);
+    state.setPrefetchSize(prefetchSize);
+    state.setPrefetchCount(prefetchCount);
 } 
         
 void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, 
@@ -289,8 +287,8 @@
                                               bool nowait, const FieldTable& fields)
 {
     
-    Queue::shared_ptr queue = getSession().getQueue(queueName);    
-    if(!consumerTag.empty() && getSession().exists(consumerTag)){
+    Queue::shared_ptr queue = state.getQueue(queueName);    
+    if(!consumerTag.empty() && state.exists(consumerTag)){
         throw ConnectionException(530, "Consumer tags must be unique");
     }
     string newTag = consumerTag;
@@ -298,7 +296,7 @@
     //also version specific behaviour now)
     if (newTag.empty()) newTag = tagGenerator.generate();
     DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
-    getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
+    state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
 
     if(!nowait)
         getProxy().getBasic().consumeOk(newTag);
@@ -308,13 +306,13 @@
 } 
         
 void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
-    getSession().cancel(consumerTag);
+    state.cancel(consumerTag);
 } 
         
 void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
-    Queue::shared_ptr queue = getSession().getQueue(queueName);    
+    Queue::shared_ptr queue = state.getQueue(queueName);    
     DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
-    if(!getSession().get(token, queue, !noAck)){
+    if(!state.get(token, queue, !noAck)){
         string clusterId;//not used, part of an imatix hack
 
         getProxy().getBasic().getEmpty(clusterId);
@@ -323,9 +321,9 @@
         
 void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
     if (multiple) {
-        getSession().ackCumulative(deliveryTag);
+        state.ackCumulative(deliveryTag);
     } else {
-        getSession().ackRange(deliveryTag, deliveryTag);
+        state.ackRange(deliveryTag, deliveryTag);
     }
 } 
         
@@ -333,23 +331,23 @@
         
 void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
 {
-    getSession().recover(requeue);
+    state.recover(requeue);
 } 
 
 void BrokerAdapter::TxHandlerImpl::select()
 {
-    getSession().startTx();
+    state.startTx();
 }
 
 void BrokerAdapter::TxHandlerImpl::commit()
 {
-    getSession().commit(&getBroker().getStore());
+    state.commit(&getBroker().getStore());
 }
 
 void BrokerAdapter::TxHandlerImpl::rollback()
 {    
-    getSession().rollback();
-    getSession().recover(false);    
+    state.rollback();
+    state.recover(false);    
 }
               
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Fri Sep 21 11:26:37 2007
@@ -20,7 +20,7 @@
  */
 #include "DtxHandlerImpl.h"
 #include "MessageHandlerImpl.h"
-#include "NameGenerator.h"
+
 #include "qpid/Exception.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -44,6 +44,7 @@
 class DtxHandler;
 class TunnelHandler;
 class MessageHandlerImpl;
+class Exchange;
 
 /**
  * Per-channel protocol adapter.
@@ -54,16 +55,10 @@
  * peer.
  * 
  */
-
-// TODO aconway 2007-09-18: BrokerAdapter is no longer an appropriate way
-// to group methods as seen by the BADHANDLERs below.
-// Handlers should be grouped by layer, the BrokerAdapter stuff
-// belongs on the SemanticHandler.
-// 
 class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
 {
   public:
-    BrokerAdapter(Session& session);
+    BrokerAdapter(SemanticState& session);
 
     BasicHandler* getBasicHandler() { return &basicHandler; }
     ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
@@ -73,7 +68,8 @@
     MessageHandler* getMessageHandler() { return &messageHandler;  }
     DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
     DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
-    framing::ProtocolVersion getVersion() const { return getConnection().getVersion(); }
+
+    framing::ProtocolVersion getVersion() const { return session.getVersion();}
 
 
     AccessHandler* getAccessHandler() {
@@ -99,7 +95,7 @@
         public HandlerImpl
     {
       public:
-        ExchangeHandlerImpl(Session& session) : HandlerImpl(session) {}
+        ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
         
         void declare(uint16_t ticket,
                      const std::string& exchange, const std::string& type,
@@ -108,10 +104,13 @@
                      const qpid::framing::FieldTable& arguments); 
         void delete_(uint16_t ticket,
                      const std::string& exchange, bool ifUnused); 
-        framing::ExchangeQueryResult query(u_int16_t ticket, const string& name);
+        framing::ExchangeQueryResult query(u_int16_t ticket,
+                                           const std::string& name);
       private:
-        void checkType(Exchange::shared_ptr exchange, const std::string& type);
-        void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate);
+        void checkType(shared_ptr<Exchange> exchange, const std::string& type);
+
+        void checkAlternate(shared_ptr<Exchange> exchange,
+                            shared_ptr<Exchange> alternate);
     };
 
     class BindingHandlerImpl : 
@@ -119,7 +118,7 @@
             public HandlerImpl
     {
     public:
-        BindingHandlerImpl(Session& session) : HandlerImpl(session) {}
+        BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
 
         framing::BindingQueryResult query(u_int16_t ticket,
                                           const std::string& exchange,
@@ -133,7 +132,7 @@
         public HandlerImpl
     {
       public:
-        QueueHandlerImpl(Session& session) : HandlerImpl(session) {}
+        QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
         
         void declare(uint16_t ticket, const std::string& queue,
                      const std::string& alternateExchange, 
@@ -148,7 +147,7 @@
                     const std::string& exchange,
                     const std::string& routingKey,
                     const qpid::framing::FieldTable& arguments );
-        framing::QueueQueryResult query(const string& queue);
+        framing::QueueQueryResult query(const std::string& queue);
         void purge(uint16_t ticket, const std::string& queue); 
         void delete_(uint16_t ticket, const std::string& queue,
                      bool ifUnused, bool ifEmpty);
@@ -159,9 +158,8 @@
         public HandlerImpl
     {
         NameGenerator tagGenerator;
-
       public:
-        BasicHandlerImpl(Session& session) : HandlerImpl(session), tagGenerator("sgen") {}
+        BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {}
 
         void qos(uint32_t prefetchSize,
                  uint16_t prefetchCount, bool global); 
@@ -181,7 +179,7 @@
         public HandlerImpl
     {
       public:
-        TxHandlerImpl(Session& session) : HandlerImpl(session) {}
+        TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
         
         void select();
         void commit();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Sep 21 11:26:37 2007
@@ -23,7 +23,7 @@
 #include <assert.h>
 
 #include "Connection.h"
-#include "Session.h"
+#include "SessionState.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "BrokerAdapter.h"
 #include "SemanticHandler.h"
@@ -52,8 +52,7 @@
     if (frame.getChannel() == 0) {
         adapter.handle(frame);
     } else {
-        SessionHandler sa = getChannel(frame.getChannel());
-        sa.in(frame);
+        getChannel(frame.getChannel()).in(frame);
     }
 }
 
@@ -94,7 +93,7 @@
     if (i != channels.end()) channels.erase(i);
 }
 
-SessionHandler Connection::getChannel(ChannelId id) {
+SessionHandler& Connection::getChannel(ChannelId id) {
     boost::optional<SessionHandler>& ch = channels[id];
     if (!ch) {
         ch = boost::in_place(boost::ref(*this), id); 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Sep 21 11:26:37 2007
@@ -35,7 +35,6 @@
 #include "qpid/framing/ProtocolVersion.h"
 #include "Broker.h"
 #include "qpid/Exception.h"
-#include "Session.h"
 #include "ConnectionHandler.h"
 #include "SessionHandler.h"
 
@@ -51,7 +50,7 @@
     Connection(sys::ConnectionOutputHandler* out, Broker& broker);
 
     /** Get the SessionHandler for channel. Create if it does not already exist */
-    SessionHandler getChannel(framing::ChannelId channel);
+    SessionHandler& getChannel(framing::ChannelId channel);
 
     /** Close the connection */
     void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Sep 21 11:26:37 2007
@@ -20,7 +20,7 @@
  */
 #include "DeliveryRecord.h"
 #include "DeliverableMessage.h"
-#include "Session.h"
+#include "SemanticState.h"
 #include "BrokerExchange.h"
 #include "qpid/log/Statement.h"
 
@@ -74,7 +74,7 @@
     return range->covers(id);
 }
 
-void DeliveryRecord::redeliver(Session* const session) const{
+void DeliveryRecord::redeliver(SemanticState* const session) const{
     if (!confirmed) {
         if(pull){
             //if message was originally sent as response to get, we must requeue it

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Sep 21 11:26:37 2007
@@ -34,7 +34,7 @@
 
 namespace qpid {
 namespace broker {
-class Session;
+class SemanticState;
 
 /**
  * Record of a delivery for which an ack is outstanding.
@@ -61,7 +61,7 @@
     void requeue() const;
     void release();
     void reject();
-    void redeliver(Session* const) const;
+    void redeliver(SemanticState* const) const;
     void updateByteCredit(uint32_t& credit) const;
     void addTo(Prefetch&) const;
     void subtractFrom(Prefetch&) const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Sep 21 11:26:37 2007
@@ -19,21 +19,20 @@
 
 #include <boost/format.hpp>
 #include "Broker.h"
-#include "Session.h"
 #include "qpid/framing/constants.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
 using std::string;
 
-DtxHandlerImpl::DtxHandlerImpl(Session& s) : HandlerImpl(s) {}
+DtxHandlerImpl::DtxHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
 
 // DtxDemarcationHandler:
 
 
 void DtxHandlerImpl::select()
 {
-    getSession().selectDtx();
+    state.selectDtx();
 }
 
 DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -43,7 +42,7 @@
 {
     try {
         if (fail) {
-            getSession().endDtx(xid, true);
+            state.endDtx(xid, true);
             if (suspend) {
                 throw ConnectionException(503, "End and suspend cannot both be set.");
             } else {
@@ -51,9 +50,9 @@
             }
         } else {
             if (suspend) {
-                getSession().suspendDtx(xid);
+                state.suspendDtx(xid);
             } else {
-                getSession().endDtx(xid, false);
+                state.endDtx(xid, false);
             }
             return DtxDemarcationEndResult(XA_OK);
         }
@@ -72,9 +71,9 @@
     }
     try {
         if (resume) {
-            getSession().resumeDtx(xid);
+            state.resumeDtx(xid);
         } else {
-            getSession().startDtx(xid, getBroker().getDtxManager(), join);
+            state.startDtx(xid, getBroker().getDtxManager(), join);
         }
         return DtxDemarcationStartResult(XA_OK);
     } catch (const DtxTimeoutException& e) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h Fri Sep 21 11:26:37 2007
@@ -32,7 +32,7 @@
       public framing::AMQP_ServerOperations::DtxDemarcationHandler
 {    
 public:
-    DtxHandlerImpl(Session&);
+    DtxHandlerImpl(SemanticState&);
 
     // DtxCoordinationHandler:
 
@@ -57,8 +57,6 @@
     void select();
     
     framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
-
-
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Fri Sep 21 11:26:37 2007
@@ -19,9 +19,8 @@
  *
  */
 
-#include "Session.h"
-#include "SessionHandler.h"
-#include "Connection.h"
+#include "SemanticState.h"
+#include "SessionState.h"
 
 namespace qpid {
 namespace broker {
@@ -34,26 +33,14 @@
  */
 class HandlerImpl {
   protected:
-    HandlerImpl(Session& s) : session(s) {}
+    SemanticState& state;
+    SessionState& session;
 
-    Session& getSession() { return session; }
-    const Session& getSession() const { return session; }
-    
-    SessionHandler* getSessionHandler() { return session.getHandler(); }
-    const SessionHandler* getSessionHandler() const { return session.getHandler(); }
+    HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
 
-    // Remaining functions may only be called if getSessionHandler() != 0
-    framing::AMQP_ClientProxy& getProxy() { return getSessionHandler()->getProxy(); }
-    const framing::AMQP_ClientProxy& getProxy() const { return getSessionHandler()->getProxy(); }
-
-    Connection& getConnection() { return getSessionHandler()->getConnection(); }
-    const Connection& getConnection() const { return getSessionHandler()->getConnection(); }
-    
-    Broker& getBroker() { return getConnection().broker; }
-    const Broker& getBroker() const { return getConnection().broker; }
-
-  private:
-    Session& session;
+    framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
+    Connection& getConnection() { return session.getConnection(); }
+    Broker& getBroker() { return session.getBroker(); }
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Fri Sep 21 11:26:37 2007
@@ -18,7 +18,6 @@
 
 #include "qpid/QpidError.h"
 #include "MessageHandlerImpl.h"
-#include "Session.h"
 #include "qpid/framing/FramingContent.h"
 #include "Connection.h"
 #include "Broker.h"
@@ -36,8 +35,7 @@
 
 using namespace framing;
 
-MessageHandlerImpl::MessageHandlerImpl(Session& session)
-    : HandlerImpl(session) {}
+MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
 
 //
 // Message class method handlers
@@ -46,7 +44,7 @@
 void
 MessageHandlerImpl::cancel(const string& destination )
 {
-    getSession().cancel(destination);
+    state.cancel(destination);
 }
 
 void
@@ -97,14 +95,14 @@
                             bool exclusive,
                             const framing::FieldTable& filter )
 {
-    Queue::shared_ptr queue = getSession().getQueue(queueName);
-    if(!destination.empty() && getSession().exists(destination))
+    Queue::shared_ptr queue = state.getQueue(queueName);
+    if(!destination.empty() && state.exists(destination))
         throw ConnectionException(530, "Consumer tags must be unique");
 
     string tag = destination;
     //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
     //the previously expected behaviour
-    getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
+    state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
                     tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
     // Dispatch messages as there is now a consumer.
     queue->requestDispatch();
@@ -117,9 +115,9 @@
                          const string& destination,
                          bool noAck )
 {
-    Queue::shared_ptr queue = getSession().getQueue(queueName);
+    Queue::shared_ptr queue = state.getQueue(queueName);
     
-    if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+    if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
         //don't send any response... rely on execution completion
     } else {
         //temporarily disabled:
@@ -148,14 +146,14 @@
                         bool /*global*/ )
 {
     //TODO: handle global
-    getSession().setPrefetchSize(prefetchSize);
-    getSession().setPrefetchCount(prefetchCount);
+    state.setPrefetchSize(prefetchSize);
+    state.setPrefetchCount(prefetchCount);
 }
 
 void
 MessageHandlerImpl::recover(bool requeue)
 {
-    getSession().recover(requeue);
+    state.recover(requeue);
 }
 
 void
@@ -166,7 +164,7 @@
     }
     
     for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
-        getSession().reject(i->getValue(), (++i)->getValue());
+        state.reject(i->getValue(), (++i)->getValue());
     }
 }
 
@@ -175,10 +173,10 @@
     
     if (unit == 0) {
         //message
-        getSession().addMessageCredit(destination, value);
+        state.addMessageCredit(destination, value);
     } else if (unit == 1) {
         //bytes
-        getSession().addByteCredit(destination, value);
+        state.addByteCredit(destination, value);
     } else {
         //unknown
         throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -190,10 +188,10 @@
 {
     if (mode == 0) {
         //credit
-        getSession().setCreditMode(destination);
+        state.setCreditMode(destination);
     } else if (mode == 1) {
         //window
-        getSession().setWindowMode(destination);
+        state.setWindowMode(destination);
     } else{
         throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);        
     }
@@ -201,12 +199,12 @@
     
 void MessageHandlerImpl::flush(const std::string& destination)
 {
-    getSession().flush(destination);        
+    state.flush(destination);        
 }
 
 void MessageHandlerImpl::stop(const std::string& destination)
 {
-    getSession().stop(destination);        
+    state.stop(destination);        
 }
 
 void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
@@ -218,7 +216,7 @@
     }
     
     for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
-        getSession().acquire(i->getValue(), (++i)->getValue(), results);
+        state.acquire(i->getValue(), (++i)->getValue(), results);
     }
 
     results = results.condense();
@@ -232,7 +230,7 @@
     }
     
     for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
-        getSession().release(i->getValue(), (++i)->getValue());
+        state.release(i->getValue(), (++i)->getValue());
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h Fri Sep 21 11:26:37 2007
@@ -37,7 +37,7 @@
         public HandlerImpl
 {
   public:
-    MessageHandlerImpl(Session&);
+    MessageHandlerImpl(SemanticState&);
 
     void append(const std::string& reference, const std::string& bytes);
 
@@ -87,8 +87,8 @@
     void release(const framing::SequenceNumberSet& transfers);
 
     void subscribe(u_int16_t ticket,
-                   const string& queue,
-                   const string& destination,
+                   const std::string& queue,
+                   const std::string& destination,
                    bool noLocal,
                    u_int8_t confirmMode,
                    u_int8_t acquireMode,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Sep 21 11:26:37 2007
@@ -20,12 +20,12 @@
  */
 
 #include "SemanticHandler.h"
-#include "Session.h"
+#include "SemanticState.h"
 #include "SessionHandler.h"
+#include "SessionState.h"
 #include "BrokerAdapter.h"
 #include "MessageDelivery.h"
 #include "Connection.h"
-#include "Session.h"
 #include "qpid/framing/ExecutionCompleteBody.h"
 #include "qpid/framing/ExecutionResultBody.h"
 #include "qpid/framing/InvocationVisitor.h"
@@ -36,7 +36,7 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {}
+SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {}
 
 void SemanticHandler::handle(framing::AMQFrame& frame) 
 {    
@@ -79,13 +79,13 @@
     if (outgoing.lwm < mark) {
         outgoing.lwm = mark;
         //ack messages:
-        getSession().ackCumulative(mark.getValue());
+        state.ackCumulative(mark.getValue());
     }
     if (range.size() % 2) { //must be even number        
         throw ConnectionException(530, "Received odd number of elements in ranged mark");
     } else {
         for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
-            getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+            state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
         }
     }
 }
@@ -95,9 +95,9 @@
     SequenceNumber mark = incoming.getMark();
     SequenceNumberSet range = incoming.getRange();
     Mutex::ScopedLock l(outLock);
-    assert(getSessionHandler()); 
-    getProxy().getExecution().complete(mark.getValue(), range);
+    session.getProxy().getExecution().complete(mark.getValue(), range);
 }
+
 void SemanticHandler::flush()
 {
     incoming.flush();
@@ -122,7 +122,7 @@
 void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
 {
     SequenceNumber id = incoming.next();
-    BrokerAdapter adapter(getSession());
+    BrokerAdapter adapter(state);
     InvocationVisitor v(&adapter);
     method->accept(v);
     incoming.complete(id);                                    
@@ -130,7 +130,7 @@
     if (!v.wasHandled()) {
         throw ConnectionException(540, "Not implemented");
     } else if (v.hasResult()) {
-        getProxy().getExecution().result(id.getValue(), v.getResult());
+        session.getProxy().getExecution().result(id.getValue(), v.getResult());
     }
     //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
     //TODO: if window gets too large send unsolicited completion
@@ -152,8 +152,8 @@
     }
     msgBuilder.handle(frame);
     if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
-        msg->setPublisher(&getConnection());
-        getSession().handle(msg);        
+        msg->setPublisher(&session.getConnection());
+        state.handle(msg);        
         msgBuilder.end();
         incoming.track(msg);
         //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
@@ -163,13 +163,17 @@
 DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
 {
     Mutex::ScopedLock l(outLock);
-    MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax());
+    MessageDelivery::deliver(
+        msg, session.getHandler().out,
+        ++outgoing.hwm, token,
+        session.getConnection().getFrameMax());
     return outgoing.hwm;
 }
 
 void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
 {
-    MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax());
+    MessageDelivery::deliver(msg, session.getHandler().out, tag, token,
+                             session.getConnection().getFrameMax());
 }
 
 SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Fri Sep 21 11:26:37 2007
@@ -44,13 +44,17 @@
 
 namespace broker {
 
-class Session;
+class SessionState;
 
 class SemanticHandler : public DeliveryAdapter,
-                        public framing::FrameHandler, 
-                        public framing::AMQP_ServerOperations::ExecutionHandler,
-                        private HandlerImpl
+                        public framing::FrameHandler,
+                        public framing::AMQP_ServerOperations::ExecutionHandler
+    
 {
+    SemanticState state;
+    SessionState& session;
+    // FIXME aconway 2007-09-20: Why are these on the handler rather than the
+    // state?
     IncomingExecutionContext incoming;
     framing::Window outgoing;
     sys::Mutex outLock;
@@ -69,8 +73,12 @@
     DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
     void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
 
+    framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
+    Connection& getConnection() { return session.getConnection(); }
+    Broker& getBroker() { return session.getBroker(); }
+
 public:
-    SemanticHandler(Session& session);
+    SemanticHandler(SessionState& session);
 
     //frame handler:
     void handle(framing::AMQFrame& frame);

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (from r577734, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp&r1=577734&r2=578219&rev=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Sep 21 11:26:37 2007
@@ -19,8 +19,7 @@
  *
  */
 
-#include "Session.h"
-
+#include "SessionState.h"
 #include "BrokerAdapter.h"
 #include "BrokerQueue.h"
 #include "Connection.h"
@@ -56,11 +55,9 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-Session::Session(SessionHandler& a, uint32_t t)
-    : adapter(&a),
-      broker(adapter->getConnection().broker),
-      timeout(t),
-      id(true),
+SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
+    : session(ss),
+      deliveryAdapter(da),
       prefetchSize(0),
       prefetchCount(0),
       tagGenerator("sgen"),
@@ -69,28 +66,21 @@
       flowActive(true)
 {
     outstanding.reset();
-    std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
-    // FIXME aconway 2007-08-29:  move deliveryAdapter to SemanticHandlerState. 
-    deliveryAdapter=semantic.get();
-    handlers.push_back(semantic.release());
-    in = &handlers[0];
-    out = &adapter->out;
-    // FIXME aconway 2007-08-31: handlerupdater->sessionupdater,
-    // create a SessionManager in the broker for all session related
-    // stuff: suspended sessions, handler updaters etc.
-    // FIXME aconway 2007-08-31: Shouldn't be passing channel ID
-    broker.update(a.getChannel(), *this);       
 }
 
-Session::~Session() {
-    close();
+SemanticState::~SemanticState() {
+    consumers.clear();
+    if (dtxBuffer.get()) {
+        dtxBuffer->fail();
+    }
+    recover(true);
 }
 
-bool Session::exists(const string& consumerTag){
+bool SemanticState::exists(const string& consumerTag){
     return consumers.find(consumerTag) != consumers.end();
 }
 
-void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut, 
+void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, 
                       Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
                       bool exclusive, const FieldTable*)
 {
@@ -101,7 +91,7 @@
     consumers.insert(tagInOut, c.release());
 }
 
-void Session::cancel(const string& tag){
+void SemanticState::cancel(const string& tag){
     // consumers is a ptr_map so erase will delete the consumer
     // which will call cancel.
     ConsumerImplMap::iterator i = consumers.find(tag);
@@ -109,22 +99,13 @@
         consumers.erase(i); 
 }
 
-void Session::close()
-{
-    opened = false;
-    consumers.clear();
-    if (dtxBuffer.get()) {
-        dtxBuffer->fail();
-    }
-    recover(true);
-}
 
-void Session::startTx()
+void SemanticState::startTx()
 {
     txBuffer = TxBuffer::shared_ptr(new TxBuffer());
 }
 
-void Session::commit(MessageStore* const store)
+void SemanticState::commit(MessageStore* const store)
 {
     if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
 
@@ -135,7 +116,7 @@
     }
 }
 
-void Session::rollback()
+void SemanticState::rollback()
 {
     if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
 
@@ -143,12 +124,12 @@
     accumulatedAck.clear();
 }
 
-void Session::selectDtx()
+void SemanticState::selectDtx()
 {
     dtxSelected = true;
 }
 
-void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
 {
     if (!dtxSelected) {
         throw ConnectionException(503, "Session has not been selected for use with dtx");
@@ -162,7 +143,7 @@
     }
 }
 
-void Session::endDtx(const std::string& xid, bool fail)
+void SemanticState::endDtx(const std::string& xid, bool fail)
 {
     if (!dtxBuffer) {
         throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
@@ -183,7 +164,7 @@
     dtxBuffer.reset();
 }
 
-void Session::suspendDtx(const std::string& xid)
+void SemanticState::suspendDtx(const std::string& xid)
 {
     if (dtxBuffer->getXid() != xid) {
         throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") 
@@ -195,7 +176,7 @@
     dtxBuffer->setSuspended(true);
 }
 
-void Session::resumeDtx(const std::string& xid)
+void SemanticState::resumeDtx(const std::string& xid)
 {
     if (dtxBuffer->getXid() != xid) {
         throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") 
@@ -210,7 +191,7 @@
     txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
 }
 
-void Session::checkDtxTimeout()
+void SemanticState::checkDtxTimeout()
 {
     if (dtxBuffer->isExpired()) {
         dtxBuffer.reset();
@@ -218,13 +199,13 @@
     }
 }
 
-void Session::record(const DeliveryRecord& delivery)
+void SemanticState::record(const DeliveryRecord& delivery)
 {
     unacked.push_back(delivery);
     delivery.addTo(outstanding);
 }
 
-bool Session::checkPrefetch(Message::shared_ptr& msg)
+bool SemanticState::checkPrefetch(Message::shared_ptr& msg)
 {
     Mutex::ScopedLock locker(deliveryLock);
     bool countOk = !prefetchCount || prefetchCount > unacked.size();
@@ -232,7 +213,7 @@
     return countOk && sizeOk;
 }
 
-Session::ConsumerImpl::ConsumerImpl(Session* _parent, 
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, 
                                     DeliveryToken::shared_ptr _token,
                                     const string& _name, 
                                     Queue::shared_ptr _queue, 
@@ -253,9 +234,10 @@
     msgCredit(0), 
     byteCredit(0) {}
 
-bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
+bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
 {
-    if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) {
+    if (nolocal &&
+        &parent->getSession().getConnection() == msg.payload->getPublisher()) {
         return false;
     } else {
         if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
@@ -266,7 +248,7 @@
             Mutex::ScopedLock locker(parent->deliveryLock);
 
             DeliveryId deliveryTag =
-                parent->deliveryAdapter->deliver(msg.payload, token);
+                parent->deliveryAdapter.deliver(msg.payload, token);
             if (windowing || ackExpected) {
                 parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
             }
@@ -275,7 +257,7 @@
     }
 }
 
-bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
 {
     Mutex::ScopedLock l(lock);
     if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
@@ -291,33 +273,34 @@
     }
 }
 
-void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
+void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
     Mutex::ScopedLock locker(parent->deliveryLock);
-    parent->deliveryAdapter->redeliver(msg, token, deliveryTag);
+    parent->deliveryAdapter.redeliver(msg, token, deliveryTag);
 }
 
-Session::ConsumerImpl::~ConsumerImpl() {
+SemanticState::ConsumerImpl::~ConsumerImpl() {
     cancel();
 }
 
-void Session::ConsumerImpl::cancel()
+void SemanticState::ConsumerImpl::cancel()
 {
     if(queue) {
         queue->cancel(this);
         if (queue->canAutoDelete()) {            
-            parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(), 
-                                                                               boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+            parent->getSession().getBroker().getQueues().destroyIf(
+                queue->getName(), 
+                boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
         }
     }
 }
 
-void Session::ConsumerImpl::requestDispatch()
+void SemanticState::ConsumerImpl::requestDispatch()
 {
     if(blocked)
         queue->requestDispatch(this);
 }
 
-void Session::handle(Message::shared_ptr msg) {
+void SemanticState::handle(Message::shared_ptr msg) {
     if (txBuffer.get()) {
         TxPublish* deliverable(new TxPublish(msg));
         TxOp::shared_ptr op(deliverable);
@@ -329,10 +312,10 @@
     }
 }
 
-void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
+void SemanticState::route(Message::shared_ptr msg, Deliverable& strategy) {
     std::string exchangeName = msg->getExchangeName();      
     if (!cacheExchange || cacheExchange->getName() != exchangeName){
-        cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName);
+        cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName);
     }
 
     cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -347,17 +330,17 @@
 
 }
 
-void Session::ackCumulative(DeliveryId id)
+void SemanticState::ackCumulative(DeliveryId id)
 {
     ack(id, id, true);
 }
 
-void Session::ackRange(DeliveryId first, DeliveryId last)
+void SemanticState::ackRange(DeliveryId first, DeliveryId last)
 {
     ack(first, last, false);
 }
 
-void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
+void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
 {
     Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
     
@@ -373,7 +356,7 @@
         ++end;
     }
 
-    for_each(start, end, boost::bind(&Session::acknowledged, this, _1));
+    for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
     
     if (txBuffer.get()) {
         //in transactional mode, don't dequeue or remove, just
@@ -398,7 +381,7 @@
     for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
 }
 
-void Session::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::acknowledged(const DeliveryRecord& delivery)
 {
     delivery.subtractFrom(outstanding);
     ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
@@ -407,7 +390,7 @@
     }
 }
 
-void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
 {
     if (windowing) {
         if (msgCredit != 0xFFFFFFFF) msgCredit++;
@@ -415,7 +398,7 @@
     }
 }
 
-void Session::recover(bool requeue)
+void SemanticState::recover(bool requeue)
 {
     Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
 
@@ -431,12 +414,12 @@
     }
 }
 
-bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
 {
     QueuedMessage msg = queue->dequeue();
     if(msg.payload){
         Mutex::ScopedLock locker(deliveryLock);
-        DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token);
+        DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg.payload, token);
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }
@@ -446,7 +429,7 @@
     }
 }
 
-void Session::deliver(Message::shared_ptr& msg, const string& consumerTag,
+void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag,
                       DeliveryId deliveryTag)
 {
     ConsumerImplMap::iterator i = consumers.find(consumerTag);
@@ -455,7 +438,7 @@
     }
 }
 
-void Session::flow(bool active)
+void SemanticState::flow(bool active)
 {
     Mutex::ScopedLock locker(deliveryLock);
     bool requestDelivery(!flowActive && active);
@@ -467,7 +450,7 @@
 }
 
 
-Session::ConsumerImpl& Session::find(const std::string& destination)
+SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
 {
     ConsumerImplMap::iterator i = consumers.find(destination);
     if (i == consumers.end()) {
@@ -477,62 +460,62 @@
     }
 }
 
-void Session::setWindowMode(const std::string& destination)
+void SemanticState::setWindowMode(const std::string& destination)
 {
     find(destination).setWindowMode();
 }
 
-void Session::setCreditMode(const std::string& destination)
+void SemanticState::setCreditMode(const std::string& destination)
 {
     find(destination).setCreditMode();
 }
 
-void Session::addByteCredit(const std::string& destination, uint32_t value)
+void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
 {
     find(destination).addByteCredit(value);
 }
 
 
-void Session::addMessageCredit(const std::string& destination, uint32_t value)
+void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
 {
     find(destination).addMessageCredit(value);
 }
 
-void Session::flush(const std::string& destination)
+void SemanticState::flush(const std::string& destination)
 {
     ConsumerImpl& c = find(destination);
     c.flush();
 }
 
 
-void Session::stop(const std::string& destination)
+void SemanticState::stop(const std::string& destination)
 {
     find(destination).stop();
 }
 
-void Session::ConsumerImpl::setWindowMode()
+void SemanticState::ConsumerImpl::setWindowMode()
 {
     windowing = true;
 }
 
-void Session::ConsumerImpl::setCreditMode()
+void SemanticState::ConsumerImpl::setCreditMode()
 {
     windowing = false;
 }
 
-void Session::ConsumerImpl::addByteCredit(uint32_t value)
+void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
     byteCredit += value;
     requestDispatch();
 }
 
-void Session::ConsumerImpl::addMessageCredit(uint32_t value)
+void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
     msgCredit += value;
     requestDispatch();
 }
 
-void Session::ConsumerImpl::flush()
+void SemanticState::ConsumerImpl::flush()
 {
     //need to prevent delivery after requestDispatch returns but
     //before credit is reduced to zero; TODO: come up with better
@@ -543,13 +526,13 @@
     msgCredit = 0;
 }
 
-void Session::ConsumerImpl::stop()
+void SemanticState::ConsumerImpl::stop()
 {
     msgCredit = 0;
     byteCredit = 0;
 }
 
-Queue::shared_ptr Session::getQueue(const string& name) const {
+Queue::shared_ptr SemanticState::getQueue(const string& name) const {
     //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10
     Queue::shared_ptr queue;
     if (name.empty()) {
@@ -558,14 +541,14 @@
             throw NotAllowedException(QPID_MSG("No queue name specified."));
     }
     else {
-        queue = getBroker().getQueues().find(name);
+        queue = session.getBroker().getQueues().find(name);
         if (!queue)
             throw NotFoundException(QPID_MSG("Queue not found: "<<name));
     }
     return queue;
 }
 
-AckRange Session::findRange(DeliveryId first, DeliveryId last)
+AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
 {    
     ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
     ack_iterator end = start;
@@ -582,21 +565,21 @@
     return AckRange(start, end);
 }
 
-void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
+void SemanticState::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
 {
     Mutex::ScopedLock locker(deliveryLock);
     AckRange range = findRange(first, last);
     for_each(range.start, range.end, AcquireFunctor(acquired));
 }
 
-void Session::release(DeliveryId first, DeliveryId last)
+void SemanticState::release(DeliveryId first, DeliveryId last)
 {
     Mutex::ScopedLock locker(deliveryLock);
     AckRange range = findRange(first, last);
     for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
 }
 
-void Session::reject(DeliveryId first, DeliveryId last)
+void SemanticState::reject(DeliveryId first, DeliveryId last)
 {
     Mutex::ScopedLock locker(deliveryLock);
     AckRange range = findRange(first, last);

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (from r577734, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h&r1=577734&r2=578219&rev=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Sep 21 11:26:37 2007
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_SESSION_H
-#define QPID_BROKER_SESSION_H
+#ifndef QPID_BROKER_SEMANTICSTATE_H
+#define QPID_BROKER_SEMANTICSTATE_H
 
 /*
  *
@@ -37,7 +37,7 @@
 #include "qpid/framing/Uuid.h"
 #include "qpid/shared_ptr.h"
 
-#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/ptr_container/ptr_map.hpp>
 
 #include <list>
 #include <vector>
@@ -45,21 +45,19 @@
 namespace qpid {
 namespace broker {
 
-class SessionHandler;
-class Broker;
+class SessionState;
 
 /**
- * Session holds the state of an open session, whether attached to a
- * channel or suspended. It also holds the handler chains associated
- * with the session. 
+ * SemanticState holds the L3 and L4 state of an open session, whether
+ * attached to a channel or suspended. 
  */
-class Session : public framing::FrameHandler::Chains,
-                private boost::noncopyable
+class SemanticState : public framing::FrameHandler::Chains,
+                      private boost::noncopyable
 {
     class ConsumerImpl : public Consumer
     {
         sys::Mutex lock;
-        Session* const parent;
+        SemanticState* const parent;
         const DeliveryToken::shared_ptr token;
         const string name;
         const Queue::shared_ptr queue;
@@ -74,7 +72,7 @@
         bool checkCredit(Message::shared_ptr& msg);
 
       public:
-        ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token, 
+        ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, 
                      const string& name, Queue::shared_ptr queue,
                      bool ack, bool nolocal, bool acquire);
         ~ConsumerImpl();
@@ -94,13 +92,8 @@
 
     typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
 
-    SessionHandler* adapter;
-    Broker& broker;
-    uint32_t timeout;
-    framing::Uuid id;
-    boost::ptr_vector<framing::FrameHandler>  handlers;
-
-    DeliveryAdapter* deliveryAdapter;
+    SessionState& session;
+    DeliveryAdapter& deliveryAdapter;
     Queue::shared_ptr defaultQueue;
     ConsumerImplMap consumers;
     uint32_t prefetchSize;    
@@ -113,7 +106,6 @@
     DtxBuffer::shared_ptr dtxBuffer;
     bool dtxSelected;
     framing::AccumulatedAck accumulatedAck;
-    bool opened;
     bool flowActive;
 
     boost::shared_ptr<Exchange> cacheExchange;
@@ -128,19 +120,10 @@
     AckRange findRange(DeliveryId first, DeliveryId last);
 
   public:
-    Session(SessionHandler&, uint32_t timeout);
-    ~Session();
-
-    /** Returns 0 if this session is not currently attached */
-    SessionHandler* getHandler() { return adapter; }
-    const SessionHandler* getHandler() const { return adapter; }
+    SemanticState(DeliveryAdapter&, SessionState&);
+    ~SemanticState();
 
-    Broker& getBroker() const { return broker; }
-    
-    /** Session timeout, aka detached-lifetime. */
-    uint32_t getTimeout() const { return timeout; }
-    /** Session ID */
-    const framing::Uuid& getId() const { return id; }
+    SessionState& getSession() { return session; }
     
     /**
      * Get named queue, never returns 0.
@@ -174,7 +157,6 @@
     void stop(const std::string& destination);
 
     bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
-    void close();
     void startTx();
     void commit(MessageStore* const store);
     void rollback();
@@ -198,4 +180,5 @@
 
 
 
-#endif  /*!QPID_BROKER_SESSION_H*/
+
+#endif  /*!QPID_BROKER_SEMANTICSTATE_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Sep 21 11:26:37 2007
@@ -19,7 +19,7 @@
  */
 
 #include "SessionHandler.h"
-#include "Session.h"
+#include "SessionState.h"
 #include "Connection.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/constants.h"
@@ -94,7 +94,7 @@
 
 void  SessionHandler::open(uint32_t detachedLifetime) {
     assertClosed("open");
-    session.reset(new Session(*this, detachedLifetime));
+    session.reset(new SessionState(*this, detachedLifetime));
     getProxy().getSession().attached(session->getId(), session->getTimeout());
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Sep 21 11:26:37 2007
@@ -31,7 +31,7 @@
 namespace broker {
 
 class Connection;
-class Session;
+class SessionState;
 
 /**
  * A SessionHandler is associated with each active channel. It
@@ -48,8 +48,8 @@
     ~SessionHandler();
 
     /** Returns 0 if not attached to a session */
-    Session* getSession() { return session.get(); }
-    const Session* getSession() const { return session.get(); }
+    SessionState* getSession() { return session.get(); }
+    const SessionState* getSession() const { return session.get(); }
 
     framing::ChannelId getChannel() const { return channel; }
     
@@ -84,7 +84,7 @@
     Connection& connection;
     const framing::ChannelId channel;
     framing::AMQP_ClientProxy proxy;
-    shared_ptr<Session> session;
+    shared_ptr<SessionState> session;
     bool ignoring;
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=578219&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Sep 21 11:26:37 2007
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionState.h"
+#include "SessionHandler.h"
+#include "Connection.h"
+#include "Broker.h"
+#include "SemanticHandler.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+
+SessionState::SessionState(SessionHandler& h, uint32_t timeout_) 
+    : handler(&h), id(true), timeout(timeout_),
+      broker(h.getConnection().broker),
+      version(h.getConnection().getVersion())
+{
+    // FIXME aconway 2007-09-21: Break dependnecy - broker updates session.
+    chain.push_back(new SemanticHandler(*this));
+    in = &chain[0];             // Incoming frame to handler chain.
+    out = &handler->out;        // Outgoing frames to SessionHandler
+
+    // FIXME aconway 2007-09-20: use broker to add plugin
+    // handlers to the chain. 
+    // FIXME aconway 2007-08-31: Shouldn't be passing channel ID.
+    broker.update(handler->getChannel(), *this);       
+}
+
+SessionHandler& SessionState::getHandler() {
+    assert(isAttached());
+    return *handler;
+}
+
+AMQP_ClientProxy& SessionState::getProxy() {
+    return getHandler().getProxy();
+}
+    /** Convenience for: getHandler()->getConnection()
+     *@pre getHandler() != 0
+     */
+Connection& SessionState::getConnection() {
+    return getHandler().getConnection();
+}
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Sep 21 11:26:37 2007
@@ -23,44 +23,73 @@
  */
 
 #include "qpid/framing/Uuid.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/noncopyable.hpp>
+
 
 namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
 namespace broker {
 
+class SessionHandler;
+class Broker;
+class Connection;
+
 /**
  * State of a session.
+ *
+ * An attached session has a SessionHandler which is attached to a
+ * connection. A suspended session has no handler.
+ *
+ * A SessionState is always associated with an open session (attached or
+ * suspended) it is destroyed when the session is closed.
+ *
+ * The SessionState includes the sessions handler chains, which may
+ * themselves have state. The handlers will be preserved as long as
+ * the session is alive.
  */
-class SessionState
+class SessionState : public framing::FrameHandler::Chains,
+                     private boost::noncopyable
 {
   public:
-    enum State { CLOSED, ACTIVE, SUSPENDED };
+    /** SessionState for a newly opened connection. */
+    SessionState(SessionHandler& h, uint32_t timeout_);
 
-    /** Initially in CLOSED state */
-    SessionState() : id(false), state(CLOSED), timeout(0) {}
+    bool isAttached() { return handler; }
 
-    /** Make CLOSED session ACTIVE, assigns a new UUID.
-     * #@param timeout in seconds
-     */
-    void open(u_int32_t timeout_) {
-        state=ACTIVE;  id.generate(); timeout=timeout_;
-    }
+    /** @pre isAttached() */
+    SessionHandler& getHandler();
 
-    /** Close a session. */
-    void close() { state=CLOSED; id.clear(); timeout=0; }
+    /** @pre isAttached() */
+    framing::AMQP_ClientProxy& getProxy();
+    
+    /** @pre isAttached() */
+    Connection& getConnection();
 
-    State getState() const { return state; }
     const framing::Uuid& getId() const { return id; }
     uint32_t getTimeout() const { return timeout; }
-
-    bool isOpen() { return state == ACTIVE; }
-    bool isClosed() { return state == CLOSED; }
-    bool isSuspended() { return state == SUSPENDED; }
+    Broker& getBroker() { return broker; }
+    framing::ProtocolVersion getVersion() const { return version; }
     
+
   private:
-  friend class SuspendedSessions;
+  friend class SessionHandler;  // Only SessionHandler can attach/detach
+    void detach() { handler=0; }
+    void attach(SessionHandler& h) { handler = &h; }
+
+    SessionHandler* handler;    
     framing::Uuid id;
-    State state;
     uint32_t timeout;
+    Broker& broker;
+    boost::ptr_vector<framing::FrameHandler> chain;
+    framing::ProtocolVersion version;
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h?rev=578219&r1=578218&r2=578219&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SuspendedSessions.h Fri Sep 21 11:26:37 2007
@@ -31,8 +31,10 @@
 namespace qpid {
 namespace broker {
 
-/** Collection of suspended sessions.
- * Thread safe.
+/**
+ * Thread safe collection of suspended sessions.
+ * Every session is owned either by a connection's SessionHandler
+ * or by the SuspendedSessions.
  */
 class SuspendedSessions {
     typedef std::multimap<sys::AbsTime,SessionState> Map;