You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/08/31 22:51:23 UTC

svn commit: r571575 [1/2] - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/ src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/framing/ src/tests/

Author: aconway
Date: Fri Aug 31 13:51:22 2007
New Revision: 571575

URL: http://svn.apache.org/viewvc?rev=571575&view=rev
Log:
	* Summary:
	 - Moved BrokerChannel functionality into Session.
	 - Moved ChannelHandler methods handling into SessionAdapter.
	 - Updated all handlers to use session.
	 (We're still using AMQP channel methods in SessionAdapter)

	 Roles & responsibilities:

	 Session:
	   - represents an _open_ session, may be active or suspended.
	   - ows all session state including handler chains.
	   - attahced to SessionAdapter when active, not when suspended.
	 
	 SessionAdapter:
	  - reprents the association of a channel with a session.
	  - owned by Connection, kept in the session map.
	  - channel open == SessionAdapter.getSessio() != 0

	 Anything that depends on attachment to a channel, connection or
	 protocol should be in SessionAdpater. Anything that suvives a
	 session suspend belongs in Session.

Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb
    incubator/qpid/trunk/qpid/cpp/src/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb Fri Aug 31 13:51:22 2007
@@ -13,14 +13,13 @@
   def generate()
     h_file("#{@dir}/constants") {
       namespace(@namespace) { 
-        @amqp.constants.each { |c|
-          genl "enum { #{c.name.shout} = #{c.value} };"
+        scope("enum AmqpConstant {","};") { 
+          genl @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" }.join(",\n")
         }
       }
     }
     
     h_file("#{@dir}/reply_exceptions") {
-      include "constants"
       include "qpid/Exception"
       namespace(@namespace) {
         @amqp.constants.each { |c|
@@ -35,6 +34,7 @@
         }
       }
     }
+    
   end
 end
 

Propchange: incubator/qpid/trunk/qpid/cpp/src/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Aug 31 13:51:22 2007
@@ -13,3 +13,4 @@
 generate.timestamp
 rubygen.timestamp
 generate_MethodHolderMaxSize_h
+.Tpo

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Aug 31 13:51:22 2007
@@ -159,7 +159,6 @@
   qpid/broker/Broker.cpp \
   qpid/broker/BrokerAdapter.cpp \
   qpid/broker/BrokerSingleton.cpp \
-  qpid/broker/BrokerChannel.cpp \
   qpid/broker/BrokerExchange.cpp \
   qpid/broker/BrokerQueue.cpp \
   qpid/broker/Connection.cpp \
@@ -227,7 +226,6 @@
 nobase_include_HEADERS = \
   $(platform_hdr) \
   qpid/broker/AccumulatedAck.h \
-  qpid/broker/BrokerChannel.h \
   qpid/broker/BrokerExchange.h \
   qpid/broker/BrokerQueue.h \
   qpid/broker/Consumer.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Aug 31 13:51:22 2007
@@ -3,24 +3,24 @@
 # 
 lib_LTLIBRARIES += libqpidcluster.la
 
-if CLUSTER
+# if CLUSTER
 
-libqpidcluster_la_SOURCES = \
-  qpid/cluster/Cluster.cpp \
-  qpid/cluster/Cluster.h \
-  qpid/cluster/Cpg.cpp \
-  qpid/cluster/Cpg.h \
-  qpid/cluster/Dispatchable.h \
-  qpid/cluster/ClusterPlugin.cpp \
-  qpid/cluster/ClassifierHandler.h \
-  qpid/cluster/ClassifierHandler.cpp \
-  qpid/cluster/SessionManager.h \
-  qpid/cluster/SessionManager.cpp
+# libqpidcluster_la_SOURCES = \
+#   qpid/cluster/Cluster.cpp \
+#   qpid/cluster/Cluster.h \
+#   qpid/cluster/Cpg.cpp \
+#   qpid/cluster/Cpg.h \
+#   qpid/cluster/Dispatchable.h \
+#   qpid/cluster/ClusterPlugin.cpp \
+#   qpid/cluster/ClassifierHandler.h \
+#   qpid/cluster/ClassifierHandler.cpp \
+#   qpid/cluster/SessionManager.h \
+#   qpid/cluster/SessionManager.cpp
 
-libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
+# libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
 
-else
+# else
 # Empty stub library to satisfy rpm spec file.
 libqpidcluster_la_SOURCES = 
 
-endif
+#endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Fri Aug 31 13:51:22 2007
@@ -15,10 +15,9 @@
  * limitations under the License.
  *
  */
-#include <boost/format.hpp>
-
 #include "BrokerAdapter.h"
-#include "BrokerChannel.h"
+#include "Session.h"
+#include "SessionAdapter.h"
 #include "Connection.h"
 #include "DeliveryToken.h"
 #include "MessageDelivery.h"
@@ -28,18 +27,23 @@
 namespace qpid {
 namespace broker {
 
-using boost::format;
 using namespace qpid;
 using namespace qpid::framing;
 
 typedef std::vector<Queue::shared_ptr> QueueVector;
 
-
-    BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) :
-    CoreRefs(ch, c, b, a),
-    connection(c),
+// FIXME aconway 2007-08-31: now that functionality is distributed
+// between different handlers, BrokerAdapter should be dropped.
+// Instead the individual class Handler interfaces can be implemented
+// by the handlers responsible for those classes.
+//
+
+BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) :
+    CoreRefs(s,
+             s.getAdapter()->getConnection(),
+             s.getAdapter()->getConnection().broker,
+             a),
     basicHandler(*this),
-    channelHandler(*this),
     exchangeHandler(*this),
     bindingHandler(*this),
     messageHandler(*this),
@@ -52,31 +56,6 @@
 ProtocolVersion BrokerAdapter::getVersion() const {
     return connection.getVersion();
 }
-              
-void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
-    channel.open();
-    client.openOk();
-} 
-        
-void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
-    channel.flow(active);
-    client.flowOk(active);
-}         
-
-void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){} 
-        
-void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/,
-    const string& /*replyText*/,
-    uint16_t /*classId*/, uint16_t /*methodId*/)
-{
-    client.closeOk();
-    // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
-    connection.closeChannel(channel.getId()); 
-} 
-        
-void BrokerAdapter::ChannelHandlerImpl::closeOk(){} 
-              
-
 
 void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, 
                                                  const string& alternateExchange, 
@@ -148,10 +127,10 @@
 }
 
 BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
-                                               const std::string& exchangeName,
-                                               const std::string& queueName,
-                                               const std::string& key,
-                                               const framing::FieldTable& args)
+                                                            const std::string& exchangeName,
+                                                            const std::string& queueName,
+                                                            const std::string& key,
+                                                            const framing::FieldTable& args)
 {
     Exchange::shared_ptr exchange;
     try {
@@ -181,7 +160,7 @@
 
 QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
 {
-    Queue::shared_ptr queue = getQueue(name);
+    Queue::shared_ptr queue = session.getQueue(name);
     Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
 
     return QueueQueryResult(queue->getName(), 
@@ -205,7 +184,7 @@
     }
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
-	queue = getQueue(name);
+	queue = session.getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
 	std::pair<Queue::shared_ptr, bool> queue_created =  
@@ -216,7 +195,7 @@
 	queue = queue_created.first;
 	assert(queue);
 	if (queue_created.second) { // This is a new queue
-	    channel.setDefaultQueue(queue);
+	    session.setDefaultQueue(queue);
             if (alternate) {
                 queue->setAlternateExchange(alternate);
                 alternate->incAlternateUsers();
@@ -236,17 +215,16 @@
 	}
     }
     if (exclusive && !queue->isExclusiveOwner(&connection)) 
-	throw ChannelException(
-            405,
-            format("Cannot grant exclusive access to queue '%s'")
-            % queue->getName());
+	throw ResourceLockedException(
+            QPID_MSG("Cannot grant exclusive access to queue "
+                     << queue->getName()));
 } 
         
 void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, 
                                            const string& exchangeName, const string& routingKey, 
                                            const FieldTable& arguments){
 
-    Queue::shared_ptr queue = getQueue(queueName);
+    Queue::shared_ptr queue = session.getQueue(queueName);
     Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
     if(exchange){
         string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -257,23 +235,23 @@
             }
         }
     }else{
-        throw ChannelException(
-            404, "Bind failed. No such exchange: " + exchangeName);
+        throw NotFoundException(
+            "Bind failed. No such exchange: " + exchangeName);
     }
 }
  
 void 
 BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
-    const string& queueName,
-    const string& exchangeName,
-    const string& routingKey,
-    const qpid::framing::FieldTable& arguments )
+                                        const string& queueName,
+                                        const string& exchangeName,
+                                        const string& routingKey,
+                                        const qpid::framing::FieldTable& arguments )
 {
-    Queue::shared_ptr queue = getQueue(queueName);
-    if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+    Queue::shared_ptr queue = session.getQueue(queueName);
+    if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
 
     Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
-    if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+    if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
 
     if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
         broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
@@ -282,17 +260,16 @@
 }
         
 void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
-    getQueue(queue)->purge();
+    session.getQueue(queue)->purge();
 } 
         
-void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, 
-                                              bool ifUnused, bool ifEmpty){
+void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
     ChannelException error(0, "");
-    Queue::shared_ptr q = getQueue(queue);
+    Queue::shared_ptr q = session.getQueue(queue);
     if(ifEmpty && q->getMessageCount() > 0){
-        throw ChannelException(406, "Queue not empty.");
+        throw PreconditionFailedException("Queue not empty.");
     }else if(ifUnused && q->getConsumerCount() > 0){
-        throw ChannelException(406, "Queue in use.");
+        throw PreconditionFailedException("Queue in use.");
     }else{
         //remove the queue from the list of exclusive queues if necessary
         if(q->isExclusiveOwner(&connection)){
@@ -310,18 +287,18 @@
 
 void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
     //TODO: handle global
-    channel.setPrefetchSize(prefetchSize);
-    channel.setPrefetchCount(prefetchCount);
+    session.setPrefetchSize(prefetchSize);
+    session.setPrefetchCount(prefetchCount);
 } 
         
 void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, 
-    const string& queueName, const string& consumerTag, 
-    bool noLocal, bool noAck, bool exclusive, 
-    bool nowait, const FieldTable& fields)
+                                              const string& queueName, const string& consumerTag, 
+                                              bool noLocal, bool noAck, bool exclusive, 
+                                              bool nowait, const FieldTable& fields)
 {
     
-    Queue::shared_ptr queue = getQueue(queueName);    
-    if(!consumerTag.empty() && channel.exists(consumerTag)){
+    Queue::shared_ptr queue = session.getQueue(queueName);    
+    if(!consumerTag.empty() && session.exists(consumerTag)){
         throw ConnectionException(530, "Consumer tags must be unique");
     }
     string newTag = consumerTag;
@@ -329,7 +306,7 @@
     //also version specific behaviour now)
     if (newTag.empty()) newTag = tagGenerator.generate();
     DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
-    channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
+    session.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
 
     if(!nowait) client.consumeOk(newTag);
 
@@ -338,13 +315,13 @@
 } 
         
 void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
-    channel.cancel(consumerTag);
+    session.cancel(consumerTag);
 } 
         
 void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
-    Queue::shared_ptr queue = getQueue(queueName);    
+    Queue::shared_ptr queue = session.getQueue(queueName);    
     DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
-    if(!channel.get(token, queue, !noAck)){
+    if(!session.get(token, queue, !noAck)){
         string clusterId;//not used, part of an imatix hack
 
         client.getEmpty(clusterId);
@@ -353,9 +330,9 @@
         
 void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
     if (multiple) {
-        channel.ackCumulative(deliveryTag);
+        session.ackCumulative(deliveryTag);
     } else {
-        channel.ackRange(deliveryTag, deliveryTag);
+        session.ackRange(deliveryTag, deliveryTag);
     }
 } 
         
@@ -363,29 +340,24 @@
         
 void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
 {
-    channel.recover(requeue);
+    session.recover(requeue);
 } 
 
 void BrokerAdapter::TxHandlerImpl::select()
 {
-    channel.startTx();
+    session.startTx();
 }
 
 void BrokerAdapter::TxHandlerImpl::commit()
 {
-    channel.commit(&broker.getStore());
+    session.commit(&broker.getStore());
 }
 
 void BrokerAdapter::TxHandlerImpl::rollback()
 {    
-    channel.rollback();
-    channel.recover(false);    
+    session.rollback();
+    session.recover(false);    
 }
               
-void BrokerAdapter::ChannelHandlerImpl::ok()
-{
-    //no specific action required, generic response handling should be sufficient
-}
-
 }} // namespace qpid::broker
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Fri Aug 31 13:51:22 2007
@@ -18,11 +18,13 @@
  * limitations under the License.
  *
  */
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "DtxHandlerImpl.h"
 #include "HandlerImpl.h"
 #include "MessageHandlerImpl.h"
-#include "DtxHandlerImpl.h"
+#include "NameGenerator.h"
 #include "qpid/Exception.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/reply_exceptions.h"
 
 namespace qpid {
 namespace broker {
@@ -48,18 +50,17 @@
  * Per-channel protocol adapter.
  *
  * A container for a collection of AMQP-class adapters that translate
- * AMQP method bodies into calls on the core Channel, Connection and
- * Broker objects. Each adapter class also provides a client proxy
- * to send methods to the peer.
+ * AMQP method bodies into calls on the core Broker objects. Each
+ * adapter class also provides a client proxy to send methods to the
+ * peer.
  * 
  */
 class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
 {
   public:
-    BrokerAdapter(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a);
+    BrokerAdapter(Session& session, framing::ChannelAdapter& a);
 
     framing::ProtocolVersion getVersion() const;
-    ChannelHandler* getChannelHandler() { return &channelHandler; }
     BasicHandler* getBasicHandler() { return &basicHandler; }
     ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
     BindingHandler* getBindingHandler() { return &bindingHandler; }
@@ -67,46 +68,27 @@
     TxHandler* getTxHandler() { return &txHandler;  }
     MessageHandler* getMessageHandler() { return &messageHandler;  }
     AccessHandler* getAccessHandler() {
-        throw ConnectionException(540, "Access class not implemented");  }
+        throw framing::NotImplementedException("Access class not implemented");  }
     FileHandler* getFileHandler() {
-        throw ConnectionException(540, "File class not implemented");  }
+        throw framing::NotImplementedException("File class not implemented");  }
     StreamHandler* getStreamHandler() {
-        throw ConnectionException(540, "Stream class not implemented");  }
+        throw framing::NotImplementedException("Stream class not implemented");  }
     TunnelHandler* getTunnelHandler() {
-        throw ConnectionException(540, "Tunnel class not implemented"); }
-    SessionHandler* getSessionHandler() { throw ConnectionException(503, "Session class not implemented yet"); }
-
+        throw framing::NotImplementedException("Tunnel class not implemented"); }
     DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
     DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
     ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); }
 
-    ConnectionHandler* getConnectionHandler() { 
-        throw ConnectionException(503, "Can't access connection class on non-zero channel!");        
-    }
+    // Handlers no longer implemented in BrokerAdapter:
+#define BADHANDLER() assert(0); throw framing::InternalErrorException()
+    ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
+    SessionHandler* getSessionHandler() { BADHANDLER(); }
+    ChannelHandler* getChannelHandler() { BADHANDLER(); }
+#undef BADHANDLER
 
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
 
   private:
-
-    class ChannelHandlerImpl :
-        public ChannelHandler,
-        public HandlerImpl<framing::AMQP_ClientProxy::Channel>
-    {
-      public:
-        ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-        
-        void open(const std::string& outOfBand); 
-        void flow(bool active); 
-        void flowOk(bool active); 
-        void ok(  );
-        void ping(  );
-        void pong(  );
-        void resume( const std::string& channelId );
-        void close(uint16_t replyCode, const
-                   std::string& replyText, uint16_t classId, uint16_t methodId); 
-        void closeOk(); 
-    };
-    
     class ExchangeHandlerImpl :
         public ExchangeHandler,
         public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
@@ -201,9 +183,7 @@
         void rollback();
     };
 
-    Connection& connection;
     BasicHandlerImpl basicHandler;
-    ChannelHandlerImpl channelHandler;
     ExchangeHandlerImpl exchangeHandler;
     BindingHandlerImpl bindingHandler;
     MessageHandlerImpl messageHandler;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Aug 31 13:51:22 2007
@@ -23,7 +23,7 @@
 #include <assert.h>
 
 #include "Connection.h"
-#include "BrokerChannel.h"
+#include "Session.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "BrokerAdapter.h"
 #include "SemanticHandler.h"
@@ -52,12 +52,8 @@
     if (frame.getChannel() == 0) {
         adapter.handle(frame);
     } else {
-        // FIXME aconway 2007-08-29: review shutdown, not more shared_ptr.
-        // OLD COMMENT:
-        // Assign handler to new shared_ptr, as it may be erased
-        // from the map by handle() if frame is a ChannelClose.
-        // 
-        getChannel((frame.getChannel())).in(frame);
+        SessionAdapter sa = getChannel(frame.getChannel());
+        sa.in(frame);
     }
 }
 
@@ -98,18 +94,12 @@
     if (i != channels.end()) channels.erase(i);
 }
 
-
-FrameHandler::Chains& Connection::getChannel(ChannelId id) {
-    // FIXME aconway 2007-08-29: Assuming session on construction,
-    // move this to SessionAdapter::open.
+SessionAdapter Connection::getChannel(ChannelId id) {
     boost::optional<SessionAdapter>& ch = channels[id];
     if (!ch) {
-        ch = boost::in_place(boost::ref(*this), id); // FIXME aconway 2007-08-29: 
-        assert(ch->getSession());
-        broker.update(id, *ch->getSession());
+        ch = boost::in_place(boost::ref(*this), id); 
     }
-    assert(ch->getSession());
-    return *ch->getSession();
+    return *ch;
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Aug 31 13:51:22 2007
@@ -36,7 +36,7 @@
 #include "qpid/framing/ProtocolVersion.h"
 #include "Broker.h"
 #include "qpid/Exception.h"
-#include "BrokerChannel.h"
+#include "Session.h"
 #include "ConnectionAdapter.h"
 #include "SessionAdapter.h"
 
@@ -45,19 +45,14 @@
 namespace qpid {
 namespace broker {
 
-class Channel;
-
 class Connection : public sys::ConnectionInputHandler, 
                    public ConnectionToken
 {
   public:
     Connection(sys::ConnectionOutputHandler* out, Broker& broker);
 
-    /** Get a channel. Create if it does not already exist */
-    framing::FrameHandler::Chains& getChannel(framing::ChannelId channel);
-
-    /** Close a channel */
-    void closeChannel(framing::ChannelId channel);
+    /** Get the SessionAdapter for channel. Create if it does not already exist */
+    SessionAdapter getChannel(framing::ChannelId channel);
 
     /** Close the connection */
     void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
@@ -84,7 +79,11 @@
     void idleIn();
     void closed();
 
+    // FIXME aconway 2007-08-30: When does closeChannel close the session?
+    void closeChannel(framing::ChannelId channel);
+
   private:
+
     // Use boost::optional to allow default-constructed uninitialized entries in the map.
     typedef std::map<framing::ChannelId, boost::optional<SessionAdapter> >ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -97,7 +96,6 @@
     framing::AMQP_ClientProxy::Connection* client;
     uint64_t stagingThreshold;
     ConnectionAdapter adapter;
-
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Aug 31 13:51:22 2007
@@ -19,7 +19,7 @@
  *
  */
 #include "DeliveryRecord.h"
-#include "BrokerChannel.h"
+#include "Session.h"
 
 using namespace qpid::broker;
 using std::string;
@@ -64,12 +64,12 @@
     return range->covers(deliveryTag);
 }
 
-void DeliveryRecord::redeliver(Channel* const channel) const{
+void DeliveryRecord::redeliver(Session* const session) const{
     if(pull){
         //if message was originally sent as response to get, we must requeue it
         requeue();
     }else{
-        channel->deliver(msg.payload, consumerTag, deliveryTag);
+        session->deliver(msg.payload, consumerTag, deliveryTag);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Aug 31 13:51:22 2007
@@ -32,44 +32,44 @@
 #include "Prefetch.h"
 
 namespace qpid {
-    namespace broker {
-        class Channel;
+namespace broker {
+class Session;
 
-        /**
-         * Record of a delivery for which an ack is outstanding.
-         */
-        class DeliveryRecord{
-            mutable QueuedMessage msg;
-            mutable Queue::shared_ptr queue;
-            const std::string consumerTag;
-            const DeliveryId deliveryTag;
-            bool acquired;
-            const bool pull;
+/**
+ * Record of a delivery for which an ack is outstanding.
+ */
+class DeliveryRecord{
+    mutable QueuedMessage msg;
+    mutable Queue::shared_ptr queue;
+    const std::string consumerTag;
+    const DeliveryId deliveryTag;
+    bool acquired;
+    const bool pull;
 
-        public:
-            DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
-            DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
+  public:
+    DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
+    DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
             
-            void dequeue(TransactionContext* ctxt = 0) const;
-            bool matches(DeliveryId tag) const;
-            bool matchOrAfter(DeliveryId tag) const;
-            bool after(DeliveryId tag) const;
-            bool coveredBy(const AccumulatedAck* const range) const;
-            void requeue() const;
-            void redeliver(Channel* const) const;
-            void updateByteCredit(uint32_t& credit) const;
-            void addTo(Prefetch&) const;
-            void subtractFrom(Prefetch&) const;
-            const std::string& getConsumerTag() const { return consumerTag; } 
-            bool isPull() const { return pull; }
-            bool isAcquired() const { return acquired; }
-            void setAcquired(bool isAcquired) { acquired = isAcquired; }
+    void dequeue(TransactionContext* ctxt = 0) const;
+    bool matches(DeliveryId tag) const;
+    bool matchOrAfter(DeliveryId tag) const;
+    bool after(DeliveryId tag) const;
+    bool coveredBy(const AccumulatedAck* const range) const;
+    void requeue() const;
+    void redeliver(Session* const) const;
+    void updateByteCredit(uint32_t& credit) const;
+    void addTo(Prefetch&) const;
+    void subtractFrom(Prefetch&) const;
+    const std::string& getConsumerTag() const { return consumerTag; } 
+    bool isPull() const { return pull; }
+    bool isAcquired() const { return acquired; }
+    void setAcquired(bool isAcquired) { acquired = isAcquired; }
             
-            friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
-        };
+  friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
+};
 
-        typedef std::list<DeliveryRecord>::iterator ack_iterator; 
-    }
+typedef std::list<DeliveryRecord>::iterator ack_iterator; 
+}
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Aug 31 13:51:22 2007
@@ -19,7 +19,7 @@
 
 #include <boost/format.hpp>
 #include "Broker.h"
-#include "BrokerChannel.h"
+#include "Session.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
@@ -41,7 +41,7 @@
 
 void DtxHandlerImpl::select()
 {
-    channel.selectDtx();
+    session.selectDtx();
 }
 
 DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -51,7 +51,7 @@
 {
     try {
         if (fail) {
-            channel.endDtx(xid, true);
+            session.endDtx(xid, true);
             if (suspend) {
                 throw ConnectionException(503, "End and suspend cannot both be set.");
             } else {
@@ -59,9 +59,9 @@
             }
         } else {
             if (suspend) {
-                channel.suspendDtx(xid);
+                session.suspendDtx(xid);
             } else {
-                channel.endDtx(xid, false);
+                session.endDtx(xid, false);
             }
             return DtxDemarcationEndResult(XA_OK);
         }
@@ -80,9 +80,9 @@
     }
     try {
         if (resume) {
-            channel.resumeDtx(xid);
+            session.resumeDtx(xid);
         } else {
-            channel.startDtx(xid, broker.getDtxManager(), join);
+            session.startDtx(xid, broker.getDtxManager(), join);
         }
         return DtxDemarcationStartResult(XA_OK);
     } catch (const DtxTimeoutException& e) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Fri Aug 31 13:51:22 2007
@@ -20,19 +20,13 @@
  */
 
 #include "Broker.h"
-#include "BrokerChannel.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 
 namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
 namespace broker {
 
-    //class Channel;
 class Connection;
+class Session;
 
 /**
  * A collection of references to the core objects required by an adapter,
@@ -40,36 +34,14 @@
  */
 struct CoreRefs
 {
-    CoreRefs(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
-        : channel(ch), connection(c), broker(b), adapter(a), proxy(a) {}
+    CoreRefs(Session& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
+        : session(ch), connection(c), broker(b), adapter(a), proxy(a) {}
 
-    Channel& channel;
+    Session& session;
     Connection& connection;
     Broker& broker;
     framing::ChannelAdapter& adapter;
     framing::AMQP_ClientProxy proxy;
-
-    /**
-     * Get named queue, never returns 0.
-     * @return: named queue or default queue for channel if name=""
-     * @exception: ChannelException if no queue of that name is found.
-     * @exception: ConnectionException if name="" and channel has no default.
-     */
-    Queue::shared_ptr getQueue(const string& name) {
-        //Note: this can be removed soon as the default queue for channels is scrapped in 0-10
-        Queue::shared_ptr queue;
-        if (name.empty()) {
-            queue = channel.getDefaultQueue();
-            if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
-        } else {
-            queue = broker.getQueues().find(name);
-            if (queue == 0) {
-                throw ChannelException( 404, "Queue not found: " + name);
-            }
-        }
-        return queue;
-    }
-
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Fri Aug 31 13:51:22 2007
@@ -18,7 +18,7 @@
 
 #include "qpid/QpidError.h"
 #include "MessageHandlerImpl.h"
-#include "BrokerChannel.h"
+#include "Session.h"
 #include "qpid/framing/FramingContent.h"
 #include "Connection.h"
 #include "Broker.h"
@@ -45,7 +45,7 @@
 void
 MessageHandlerImpl::cancel(const string& destination )
 {
-    channel.cancel(destination);
+    session.cancel(destination);
 }
 
 void
@@ -96,12 +96,12 @@
                             bool exclusive,
                             const framing::FieldTable& filter )
 {
-    Queue::shared_ptr queue = getQueue(queueName);
-    if(!destination.empty() && channel.exists(destination))
+    Queue::shared_ptr queue = session.getQueue(queueName);
+    if(!destination.empty() && session.exists(destination))
         throw ConnectionException(530, "Consumer tags must be unique");
 
     string tag = destination;
-    channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
+    session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
                     tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
     // Dispatch messages as there is now a consumer.
     queue->requestDispatch();
@@ -114,9 +114,9 @@
                          const string& destination,
                          bool noAck )
 {
-    Queue::shared_ptr queue = getQueue(queueName);
+    Queue::shared_ptr queue = session.getQueue(queueName);
     
-    if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+    if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
         //don't send any response... rely on execution completion
     } else {
         //temporarily disabled:
@@ -145,14 +145,14 @@
                         bool /*global*/ )
 {
     //TODO: handle global
-    channel.setPrefetchSize(prefetchSize);
-    channel.setPrefetchCount(prefetchCount);
+    session.setPrefetchSize(prefetchSize);
+    session.setPrefetchCount(prefetchCount);
 }
 
 void
 MessageHandlerImpl::recover(bool requeue)
 {
-    channel.recover(requeue);
+    session.recover(requeue);
 }
 
 void
@@ -166,10 +166,10 @@
     
     if (unit == 0) {
         //message
-        channel.addMessageCredit(destination, value);
+        session.addMessageCredit(destination, value);
     } else if (unit == 1) {
         //bytes
-        channel.addByteCredit(destination, value);
+        session.addByteCredit(destination, value);
     } else {
         //unknown
         throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -181,10 +181,10 @@
 {
     if (mode == 0) {
         //credit
-        channel.setCreditMode(destination);
+        session.setCreditMode(destination);
     } else if (mode == 1) {
         //window
-        channel.setWindowMode(destination);
+        session.setWindowMode(destination);
     } else{
         throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);        
     }
@@ -192,12 +192,12 @@
     
 void MessageHandlerImpl::flush(const std::string& destination)
 {
-    channel.flush(destination);        
+    session.flush(destination);        
 }
 
 void MessageHandlerImpl::stop(const std::string& destination)
 {
-    channel.stop(destination);        
+    session.stop(destination);        
 }
 
 void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Aug 31 13:51:22 2007
@@ -20,25 +20,29 @@
  */
 
 #include "SemanticHandler.h"
-
-#include "boost/format.hpp"
+#include "Session.h"
+#include "SessionAdapter.h"
 #include "BrokerAdapter.h"
 #include "MessageDelivery.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/ChannelCloseOkBody.h"
+#include "Connection.h"
+#include "Session.h"
 #include "qpid/framing/ExecutionCompleteBody.h"
 #include "qpid/framing/ExecutionResultBody.h"
+#include "qpid/framing/ChannelOpenBody.h"
 #include "qpid/framing/InvocationVisitor.h"
 
+#include <boost/format.hpp>
+
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : 
-    connection(c), channel(c, *this, id)
+SemanticHandler::SemanticHandler(Session& s) :
+    session(s),
+    connection(s.getAdapter()->getConnection()),
+    adapter(s, static_cast<ChannelAdapter&>(*this))
 {
-    init(id, connection.getOutput(), connection.getVersion());
-    adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
+    init(s.getAdapter()->getChannel(), s.out, 0);
 }
 
 void SemanticHandler::handle(framing::AMQFrame& frame) 
@@ -60,35 +64,18 @@
     //open. execute it (i.e. out-of order execution with respect to
     //the command id sequence) or queue it up?
 
-    try{
-
-        TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
-        
-        switch(track) {   
-        case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler
-            handleL2(frame.castBody<AMQMethodBody>());
-            break;
-        case EXECUTION_CONTROL_TRACK:
-            handleL3(frame.castBody<AMQMethodBody>());
-            break;
-        case MODEL_COMMAND_TRACK:
-            if (!isOpen()) {
-                throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
-            }
-            handleCommand(frame.castBody<AMQMethodBody>());
-            break;
-        case MODEL_CONTENT_TRACK:
-            handleContent(frame);
-            break;
-        }
+    TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
         
-    }catch(const ChannelException& e){
-        adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
-        connection.closeChannel(getId());
-    }catch(const ConnectionException& e){
-        connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
-    }catch(const std::exception& e){
-        connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame));
+    switch(track) {   
+      case EXECUTION_CONTROL_TRACK:
+        handleL3(frame.getMethod());
+        break;
+      case MODEL_COMMAND_TRACK:
+        handleCommand(frame.getMethod());
+        break;
+      case MODEL_CONTENT_TRACK:
+        handleContent(frame);
+        break;
     }
 }
 
@@ -99,13 +86,13 @@
     if (outgoing.lwm < mark) {
         outgoing.lwm = mark;
         //ack messages:
-        channel.ackCumulative(mark.getValue());
+        session.ackCumulative(mark.getValue());
     }
     if (range.size() % 2) { //must be even number        
         throw ConnectionException(530, "Received odd number of elements in ranged mark");
     } else {
         for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
-            channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+            session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
         }
     }
 }
@@ -141,7 +128,7 @@
 void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
 {
     ++(incoming.lwm);                        
-    InvocationVisitor v(adapter.get());
+    InvocationVisitor v(&adapter);
     method->accept(v);
     //TODO: need to account for async store operations and interleaving
     ++(incoming.hwm);                                    
@@ -153,17 +140,6 @@
     }
 }
 
-void SemanticHandler::handleL2(framing::AMQMethodBody* method)
-{
-    if(!method->isA<ChannelOpenBody>() && !isOpen()) {
-        if (!method->isA<ChannelCloseOkBody>()) {
-            throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
-        }
-    } else {
-        method->invoke(adapter->getChannelHandler());
-    }
-}
-
 void SemanticHandler::handleL3(framing::AMQMethodBody* method)
 {
     if (!method->invoke(this)) {
@@ -181,16 +157,16 @@
     msgBuilder.handle(frame);
     if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
         msg->setPublisher(&connection);
-        channel.handle(msg);
+        session.handle(msg);
         msgBuilder.end();
         //TODO: need to account for async store operations and interleaving
         ++(incoming.hwm);                
     }
 }
 
-bool SemanticHandler::isOpen() const 
-{ 
-    return channel.isOpen(); 
+bool SemanticHandler::isOpen() const {
+    // FIXME aconway 2007-08-30: remove.
+    return true;
 }
 
 DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
@@ -210,45 +186,39 @@
 void SemanticHandler::send(const AMQBody& body)
 {
     Mutex::ScopedLock l(outLock);
-    if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) {
-        //temporary hack until channel management is moved to its own handler:
+    // FIXME aconway 2007-08-31: SessionAdapter should not send
+    // channel/session commands  via the semantic handler, it should shortcut
+    // directly to its own output handler. That will make the CLASS_ID
+    // part of the test unnecessary.
+    // 
+    if (body.getMethod() &&
+        body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID)
+    {
         ++outgoing.hwm;
     }
     ChannelAdapter::send(body);
 }
 
-uint16_t SemanticHandler::getClassId(const AMQFrame& frame)
-{
-    return frame.getBody()->type() == METHOD_BODY ?  frame.castBody<AMQMethodBody>()->amqpClassId() : 0;
-}
-
-uint16_t SemanticHandler::getMethodId(const AMQFrame& frame)
-{
-    return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0;
-}
-
 SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
 {
     //will be replaced by field in 0-10 frame header
     uint8_t type = frame.getBody()->type();
     uint16_t classId;
     switch(type) {
-    case METHOD_BODY:
+      case METHOD_BODY:
         if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
             return MODEL_CONTENT_TRACK;
         }
 
         classId = frame.castBody<AMQMethodBody>()->amqpClassId();
         switch (classId) {
-        case ChannelOpenBody::CLASS_ID:
-            return SESSION_CONTROL_TRACK;
-        case ExecutionCompleteBody::CLASS_ID:
+          case ExecutionCompleteBody::CLASS_ID:
             return EXECUTION_CONTROL_TRACK;
         }
 
         return MODEL_COMMAND_TRACK;
-    case HEADER_BODY:
-    case CONTENT_BODY:
+      case HEADER_BODY:
+      case CONTENT_BODY:
         return MODEL_CONTENT_TRACK;
     }
     throw Exception("Could not determine track");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Fri Aug 31 13:51:22 2007
@@ -22,14 +22,15 @@
 #define _SemanticHandler_
 
 #include <memory>
-#include "BrokerChannel.h"
-#include "Connection.h"
+#include "BrokerAdapter.h"
 #include "DeliveryAdapter.h"
 #include "MessageBuilder.h"
+
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/ChannelAdapter.h"
 
 namespace qpid {
 
@@ -42,34 +43,31 @@
 
 namespace broker {
 
-class BrokerAdapter;
-class framing::ChannelAdapter;
+class Session;
 
-class SemanticHandler : private framing::ChannelAdapter, 
-    private DeliveryAdapter,
-    public framing::FrameHandler, 
-    public framing::AMQP_ServerOperations::ExecutionHandler
+class SemanticHandler : public DeliveryAdapter,
+                        public framing::FrameHandler, 
+                        public framing::AMQP_ServerOperations::ExecutionHandler,
+                        private framing::ChannelAdapter
 {
+    Session& session;
     Connection& connection;
-    Channel channel;
-    std::auto_ptr<BrokerAdapter> adapter;
+    BrokerAdapter adapter;
     framing::Window incoming;
     framing::Window outgoing;
     sys::Mutex outLock;
     MessageBuilder msgBuilder;
 
-    enum TrackId {SESSION_CONTROL_TRACK, EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
+    enum TrackId {EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
     TrackId getTrack(const framing::AMQFrame& frame);
-    uint16_t getClassId(const framing::AMQFrame& frame);
-    uint16_t getMethodId(const framing::AMQFrame& frame);
 
     void handleL3(framing::AMQMethodBody* method);
-    void handleL2(framing::AMQMethodBody* method);
     void handleCommand(framing::AMQMethodBody* method);
     void handleContent(framing::AMQFrame& frame);
 
     //ChannelAdapter virtual methods:
     void handleMethod(framing::AMQMethodBody* method);
+    
     bool isOpen() const;
     void handleHeader(framing::AMQHeaderBody*);
     void handleContent(framing::AMQContentBody*);
@@ -83,10 +81,13 @@
     void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
 
 public:
-    SemanticHandler(framing::ChannelId id, Connection& c);
+    SemanticHandler(Session& session);
 
     //frame handler:
     void handle(framing::AMQFrame& frame);
+
+    // FIXME aconway 2007-08-31: Move proxy to Session.
+    framing::AMQP_ClientProxy& getProxy() { return adapter.getProxy(); }
 
     //execution class method handlers:
     void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);    

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Fri Aug 31 13:51:22 2007
@@ -1,4 +1,5 @@
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,22 +20,543 @@
  */
 
 #include "Session.h"
+
+#include "BrokerAdapter.h"
+#include "BrokerQueue.h"
+#include "Connection.h"
+#include "DeliverableMessage.h"
+#include "DtxAck.h"
+#include "DtxTimeout.h"
+#include "Message.h"
 #include "SemanticHandler.h"
 #include "SessionAdapter.h"
+#include "TxAck.h"
+#include "TxPublish.h"
+#include "qpid/QpidError.h"
+#include "qpid/framing/reply_exceptions.h"
+
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+
+#include <assert.h>
+
 
 namespace qpid {
 namespace broker {
 
+using std::mem_fun_ref;
+using std::bind2nd;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
 Session::Session(SessionAdapter& a, uint32_t t)
-    : adapter(&a), timeout(t)
+    : adapter(&a),
+      broker(adapter->getConnection().broker),
+      timeout(t),
+      prefetchSize(0),
+      prefetchCount(0),
+      tagGenerator("sgen"),
+      dtxSelected(false),
+      accumulatedAck(0),
+      flowActive(true)
 {
-    assert(adapter);
+    outstanding.reset();
     // FIXME aconway 2007-08-29: handler to get Session, not connection.
-    handlers.push_back(new SemanticHandler(adapter->getChannel(), adapter->getConnection()));
+    std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
+    deliveryAdapter=semantic.get();
+    // FIXME aconway 2007-08-31: Remove, workaround.
+    semanticHandler=semantic.get();
+    handlers.push_back(semantic.release());
     in = &handlers[0];
-    out = &adapter->getConnection().getOutput();
+    out = &adapter->out;
+    // FIXME aconway 2007-08-31: handlerupdater->sessionupdater,
+    // create a SessionManager in the broker for all session related
+    // stuff: suspended sessions, handler updaters etc.
+    // FIXME aconway 2007-08-31: Shouldn't be passing channel ID
+    broker.update(a.getChannel(), *this);       
+}
+
+Session::~Session() {
+    close();
+}
+
+bool Session::exists(const string& consumerTag){
+    return consumers.find(consumerTag) != consumers.end();
+}
+
+void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut, 
+                      Queue::shared_ptr queue, bool nolocal, bool acks,
+                      bool exclusive, const FieldTable*)
+{
+    if(tagInOut.empty())
+        tagInOut = tagGenerator.generate();
+    std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
+    queue->consume(c.get(), exclusive);//may throw exception
+    consumers.insert(tagInOut, c.release());
+}
+
+void Session::cancel(const string& tag){
+    // consumers is a ptr_map so erase will delete the consumer
+    // which will call cancel.
+    ConsumerImplMap::iterator i = consumers.find(tag);
+    if (i != consumers.end())
+        consumers.erase(i); 
+}
+
+void Session::close()
+{
+    opened = false;
+    consumers.clear();
+    if (dtxBuffer.get()) {
+        dtxBuffer->fail();
+    }
+    recover(true);
+}
+
+void Session::startTx()
+{
+    txBuffer = TxBuffer::shared_ptr(new TxBuffer());
+}
+
+void Session::commit(MessageStore* const store)
+{
+    if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+
+    TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+    txBuffer->enlist(txAck);
+    if (txBuffer->commitLocal(store)) {
+        accumulatedAck.clear();
+    }
+}
+
+void Session::rollback()
+{
+    if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+
+    txBuffer->rollback();
+    accumulatedAck.clear();
+}
+
+void Session::selectDtx()
+{
+    dtxSelected = true;
+}
+
+void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+{
+    if (!dtxSelected) {
+        throw ConnectionException(503, "Session has not been selected for use with dtx");
+    }
+    dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
+    txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
+    if (join) {
+        mgr.join(xid, dtxBuffer);
+    } else {
+        mgr.start(xid, dtxBuffer);
+    }
+}
+
+void Session::endDtx(const std::string& xid, bool fail)
+{
+    if (!dtxBuffer) {
+        throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
+    }
+    if (dtxBuffer->getXid() != xid) {
+        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") 
+                                  % dtxBuffer->getXid() % xid);
+    }
+
+    txBuffer.reset();//ops on this session no longer transactional
+
+    checkDtxTimeout();
+    if (fail) {
+        dtxBuffer->fail();
+    } else {
+        dtxBuffer->markEnded();
+    }    
+    dtxBuffer.reset();
+}
+
+void Session::suspendDtx(const std::string& xid)
+{
+    if (dtxBuffer->getXid() != xid) {
+        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") 
+                                  % dtxBuffer->getXid() % xid);
+    }
+    txBuffer.reset();//ops on this session no longer transactional
+
+    checkDtxTimeout();
+    dtxBuffer->setSuspended(true);
+}
+
+void Session::resumeDtx(const std::string& xid)
+{
+    if (dtxBuffer->getXid() != xid) {
+        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") 
+                                  % dtxBuffer->getXid() % xid);
+    }
+    if (!dtxBuffer->isSuspended()) {
+        throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+    }
+
+    checkDtxTimeout();
+    dtxBuffer->setSuspended(false);
+    txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
 }
 
+void Session::checkDtxTimeout()
+{
+    if (dtxBuffer->isExpired()) {
+        dtxBuffer.reset();
+        throw DtxTimeoutException();
+    }
+}
+
+void Session::record(const DeliveryRecord& delivery)
+{
+    unacked.push_back(delivery);
+    delivery.addTo(outstanding);
+}
+
+bool Session::checkPrefetch(Message::shared_ptr& msg)
+{
+    Mutex::ScopedLock locker(deliveryLock);
+    bool countOk = !prefetchCount || prefetchCount > unacked.size();
+    bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
+    return countOk && sizeOk;
+}
+
+Session::ConsumerImpl::ConsumerImpl(Session* _parent, 
+                                    DeliveryToken::shared_ptr _token,
+                                    const string& _name, 
+                                    Queue::shared_ptr _queue, 
+                                    bool ack,
+                                    bool _nolocal,
+                                    bool _acquire
+) : parent(_parent), 
+    token(_token), 
+    name(_name), 
+    queue(_queue), 
+    ackExpected(ack), 
+    nolocal(_nolocal),
+    acquire(_acquire),
+    blocked(false), 
+    windowing(true), 
+    msgCredit(0xFFFFFFFF), 
+    byteCredit(0xFFFFFFFF) {}
+
+bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
+{
+    if (nolocal && &parent->getAdapter()->getConnection() == msg.payload->getPublisher()) {
+        return false;
+    } else {
+        if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
+            blocked = true;
+        } else {
+            blocked = false;
+
+            Mutex::ScopedLock locker(parent->deliveryLock);
+
+            DeliveryId deliveryTag =
+                parent->deliveryAdapter->deliver(msg.payload, token);
+            if (ackExpected) {
+                parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
+            }
+        }
+        return !blocked;
+    }
+}
+
+bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+{
+    Mutex::ScopedLock l(lock);
+    if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
+        return false;
+    } else {
+        if (msgCredit != 0xFFFFFFFF) {
+            msgCredit--;
+        }
+        if (byteCredit != 0xFFFFFFFF) {
+            byteCredit -= msg->getRequiredCredit();
+        }
+        return true;
+    }
+}
+
+void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
+    Mutex::ScopedLock locker(parent->deliveryLock);
+    parent->deliveryAdapter->redeliver(msg, token, deliveryTag);
+}
+
+Session::ConsumerImpl::~ConsumerImpl() {
+    cancel();
+}
+
+void Session::ConsumerImpl::cancel()
+{
+    if(queue) {
+        queue->cancel(this);
+        if (queue->canAutoDelete()) {            
+            parent->getAdapter()->getConnection().broker.getQueues().destroyIf(queue->getName(), 
+                                                                               boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+        }
+    }
+}
+
+void Session::ConsumerImpl::requestDispatch()
+{
+    if(blocked)
+        queue->requestDispatch();
+}
+
+void Session::handle(Message::shared_ptr msg) {
+    if (txBuffer.get()) {
+        TxPublish* deliverable(new TxPublish(msg));
+        TxOp::shared_ptr op(deliverable);
+        route(msg, *deliverable);
+        txBuffer->enlist(op);
+    } else {
+        DeliverableMessage deliverable(msg);
+        route(msg, deliverable);
+    }
+}
 
+void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
+    std::string exchangeName = msg->getExchangeName();      
+    if (!cacheExchange || cacheExchange->getName() != exchangeName){
+        cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
+    }
+
+    cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+
+    if (!strategy.delivered) {
+        //TODO:if reject-unroutable, then reject
+        //else route to alternate exchange
+        if (cacheExchange->getAlternate()) {
+            cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+        }
+    }
+
+}
+
+void Session::ackCumulative(DeliveryId id)
+{
+    ack(id, id, true);
+}
+
+void Session::ackRange(DeliveryId first, DeliveryId last)
+{
+    ack(first, last, false);
+}
+
+void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
+{
+    Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+    
+    ack_iterator start = cumulative ? unacked.begin() : 
+        find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+    ack_iterator end = start;
+     
+    if (cumulative || first != last) {
+        //need to find end (position it just after the last record in range)
+        end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+    } else {
+        //just acked single element (move end past it)
+        ++end;
+    }
+
+    for_each(start, end, boost::bind(&Session::acknowledged, this, _1));
+    
+    if (txBuffer.get()) {
+        //in transactional mode, don't dequeue or remove, just
+        //maintain set of acknowledged messages:
+        accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+        
+        if (dtxBuffer.get()) {
+            //if enlisted in a dtx, remove the relevant slice from
+            //unacked and record it against that transaction
+            TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+            accumulatedAck.clear();
+            dtxBuffer->enlist(txAck);    
+        }
+    } else {
+        for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+        unacked.erase(start, end);
+    }
+    
+    //if the prefetch limit had previously been reached, or credit
+    //had expired in windowing mode there may be messages that can
+    //be now be delivered
+    for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+}
+
+void Session::acknowledged(const DeliveryRecord& delivery)
+{
+    delivery.subtractFrom(outstanding);
+    ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+    if (i != consumers.end()) {
+        i->acknowledged(delivery);
+    }
+}
+
+void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+{
+    if (windowing) {
+        if (msgCredit != 0xFFFFFFFF) msgCredit++;
+        if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
+    }
+}
+
+void Session::recover(bool requeue)
+{
+    Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+    if(requeue){
+        outstanding.reset();
+        //take copy and clear unacked as requeue may result in redelivery to this session
+        //which will in turn result in additions to unacked
+        std::list<DeliveryRecord> copy = unacked;
+        unacked.clear();
+        for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
+    }else{
+        for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));        
+    }
+}
+
+bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+{
+    QueuedMessage msg = queue->dequeue();
+    if(msg.payload){
+        Mutex::ScopedLock locker(deliveryLock);
+        DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token);
+        if(ackExpected){
+            unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
+        }
+        return true;
+    }else{
+        return false;
+    }
+}
+
+void Session::deliver(Message::shared_ptr& msg, const string& consumerTag,
+                      DeliveryId deliveryTag)
+{
+    ConsumerImplMap::iterator i = consumers.find(consumerTag);
+    if (i != consumers.end()){
+        i->redeliver(msg, deliveryTag);
+    }
+}
+
+void Session::flow(bool active)
+{
+    Mutex::ScopedLock locker(deliveryLock);
+    bool requestDelivery(!flowActive && active);
+    flowActive = active;
+    if (requestDelivery) {
+        //there may be messages that can be now be delivered
+        std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+    }
+}
+
+
+Session::ConsumerImpl& Session::find(const std::string& destination)
+{
+    ConsumerImplMap::iterator i = consumers.find(destination);
+    if (i == consumers.end()) {
+        throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+    } else {
+        return *i;
+    }
+}
+
+void Session::setWindowMode(const std::string& destination)
+{
+    find(destination).setWindowMode();
+}
+
+void Session::setCreditMode(const std::string& destination)
+{
+    find(destination).setCreditMode();
+}
+
+void Session::addByteCredit(const std::string& destination, uint32_t value)
+{
+    find(destination).addByteCredit(value);
+}
+
+
+void Session::addMessageCredit(const std::string& destination, uint32_t value)
+{
+    find(destination).addMessageCredit(value);
+}
+
+void Session::flush(const std::string& destination)
+{
+    ConsumerImpl& c = find(destination);
+    c.flush();
+}
+
+
+void Session::stop(const std::string& destination)
+{
+    find(destination).stop();
+}
+
+void Session::ConsumerImpl::setWindowMode()
+{
+    windowing = true;
+}
+
+void Session::ConsumerImpl::setCreditMode()
+{
+    windowing = false;
+}
+
+void Session::ConsumerImpl::addByteCredit(uint32_t value)
+{
+    byteCredit += value;
+    requestDispatch();
+}
+
+void Session::ConsumerImpl::addMessageCredit(uint32_t value)
+{
+    msgCredit += value;
+    requestDispatch();
+}
+
+void Session::ConsumerImpl::flush()
+{
+    //TODO: need to wait until any messages that are available for
+    //this consumer have been delivered... i.e. some sort of flush on
+    //the queue...
+}
+
+void Session::ConsumerImpl::stop()
+{
+    msgCredit = 0;
+    byteCredit = 0;
+}
+
+Queue::shared_ptr Session::getQueue(const string& name) const {
+    //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10
+    Queue::shared_ptr queue;
+    if (name.empty()) {
+        queue = getDefaultQueue();
+        if (!queue)
+            throw NotAllowedException(QPID_MSG("No queue name specified."));
+    }
+    else {
+        queue = getBroker().getQueues().find(name);
+        if (!queue)
+            throw NotFoundException(QPID_MSG("Queue not found: "<<name));
+    }
+    return queue;
+}
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Fri Aug 31 13:51:22 2007
@@ -2,6 +2,7 @@
 #define QPID_BROKER_SESSION_H
 
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,14 +22,35 @@
  *
  */
 
+#include "AccumulatedAck.h"
+#include "Consumer.h"
+#include "Deliverable.h"
+#include "DeliveryAdapter.h"
+#include "DeliveryRecord.h"
+#include "DeliveryToken.h"
+#include "DtxBuffer.h"
+#include "DtxManager.h"
+#include "NameGenerator.h"
+#include "Prefetch.h"
+#include "TxBuffer.h"
+#include "SemanticHandler.h"  // FIXME aconway 2007-08-31: remove
 #include "qpid/framing/FrameHandler.h"
+#include "qpid/shared_ptr.h"
 
 #include <boost/ptr_container/ptr_vector.hpp>
 
+#include <list>
+
 namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
 namespace broker {
 
 class SessionAdapter;
+class Broker;
 
 /**
  * Session holds the state of an open session, whether attached to a
@@ -38,19 +60,145 @@
 class Session : public framing::FrameHandler::Chains,
                 private boost::noncopyable
 {
+    class ConsumerImpl : public Consumer
+    {
+        sys::Mutex lock;
+        Session* const parent;
+        const DeliveryToken::shared_ptr token;
+        const string name;
+        const Queue::shared_ptr queue;
+        const bool ackExpected;
+        const bool nolocal;
+        const bool acquire;
+        bool blocked;
+        bool windowing;
+        uint32_t msgCredit;
+        uint32_t byteCredit;
+
+        bool checkCredit(Message::shared_ptr& msg);
+
+      public:
+        ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token, 
+                     const string& name, Queue::shared_ptr queue,
+                     bool ack, bool nolocal, bool acquire=true);
+        ~ConsumerImpl();
+        bool deliver(QueuedMessage& msg);            
+        void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
+        void cancel();
+        void requestDispatch();
+
+        void setWindowMode();
+        void setCreditMode();
+        void addByteCredit(uint32_t value);
+        void addMessageCredit(uint32_t value);
+        void flush();
+        void stop();
+        void acknowledged(const DeliveryRecord&);    
+    };
+
+    typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+
+    SessionAdapter* adapter;
+    Broker& broker;
+    uint32_t timeout;
+    boost::ptr_vector<framing::FrameHandler>  handlers;
+
+    DeliveryAdapter* deliveryAdapter;
+    Queue::shared_ptr defaultQueue;
+    ConsumerImplMap consumers;
+    uint32_t prefetchSize;    
+    uint16_t prefetchCount;    
+    Prefetch outstanding;
+    NameGenerator tagGenerator;
+    std::list<DeliveryRecord> unacked;
+    sys::Mutex deliveryLock;
+    TxBuffer::shared_ptr txBuffer;
+    DtxBuffer::shared_ptr dtxBuffer;
+    bool dtxSelected;
+    AccumulatedAck accumulatedAck;
+    bool opened;
+    bool flowActive;
+
+    boost::shared_ptr<Exchange> cacheExchange;
+    
+    void route(Message::shared_ptr msg, Deliverable& strategy);
+    void record(const DeliveryRecord& delivery);
+    bool checkPrefetch(Message::shared_ptr& msg);
+    void checkDtxTimeout();
+    ConsumerImpl& find(const std::string& destination);
+    void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
+    void acknowledged(const DeliveryRecord&);
+
+    // FIXME aconway 2007-08-31: remove, temporary hack.
+    SemanticHandler* semanticHandler;
+    
+
   public:
     Session(SessionAdapter&, uint32_t timeout);
+    ~Session();
 
     /** Returns 0 if this session is not currently attached */
     SessionAdapter* getAdapter() { return adapter; }
     const SessionAdapter* getAdapter() const { return adapter; }
 
+    Broker& getBroker() const { return broker; }
+    
+    /** Session timeout. */
     uint32_t getTimeout() const { return timeout; }
     
-  private:
-    SessionAdapter* adapter;
-    uint32_t timeout;
-    boost::ptr_vector<framing::FrameHandler>  handlers;
+    /**
+     * Get named queue, never returns 0.
+     * @return: named queue or default queue for session if name=""
+     * @exception: ChannelException if no queue of that name is found.
+     * @exception: ConnectionException if name="" and session has no default.
+     */
+    Queue::shared_ptr getQueue(const std::string& name) const;
+    
+
+    void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+    Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
+    uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
+    uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
+
+    bool exists(const string& consumerTag);
+
+    /**
+     *@param tagInOut - if empty it is updated with the generated token.
+     */
+    void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, 
+                 bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0);
+
+    void cancel(const string& tag);
+
+    void setWindowMode(const std::string& destination);
+    void setCreditMode(const std::string& destination);
+    void addByteCredit(const std::string& destination, uint32_t value);
+    void addMessageCredit(const std::string& destination, uint32_t value);
+    void flush(const std::string& destination);
+    void stop(const std::string& destination);
+
+    bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
+    void close();
+    void startTx();
+    void commit(MessageStore* const store);
+    void rollback();
+    void selectDtx();
+    void startDtx(const std::string& xid, DtxManager& mgr, bool join);
+    void endDtx(const std::string& xid, bool fail);
+    void suspendDtx(const std::string& xid);
+    void resumeDtx(const std::string& xid);
+    void ackCumulative(DeliveryId deliveryTag);
+    void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
+    void recover(bool requeue);
+    void flow(bool active);
+    void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);            
+
+    void handle(Message::shared_ptr msg);
+
+    framing::AMQP_ClientProxy& getProxy() {
+        // FIXME aconway 2007-08-31: Move proxy to Session.
+        return semanticHandler->getProxy();
+    }    
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Aug 31 13:51:22 2007
@@ -19,27 +19,128 @@
  */
 
 #include "SessionAdapter.h"
+#include "Session.h"
+#include "Connection.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
 using namespace framing;
 
+// FIXME aconway 2007-08-31: the SessionAdapter should create its
+// private proxy directly on the connections out handler.
+// Session/channel methods should not go thru the other layers.
+// Need to get rid of ChannelAdapter and allow proxies to be created
+// directly on output handlers.
+// 
+framing::AMQP_ClientProxy& SessionAdapter::getProxy() {
+    return session->getProxy();
+}
+
 SessionAdapter::SessionAdapter(Connection& c, ChannelId ch)
-    : connection(c), channel(ch)
+    : connection(c), channel(ch), ignoring(false)
 {
-    // FIXME aconway 2007-08-29: When we handle session commands,
-    // do this on open.
-    session.reset(new Session(*this, 0));
+    in = this;
+    out = &c.getOutput();
 }
 
 SessionAdapter::~SessionAdapter() {}
 
+namespace {
+ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
+MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
+} // namespace
 
 void SessionAdapter::handle(AMQFrame& f) {
-    // FIXME aconway 2007-08-29: handle session commands here, forward
-    // other frames.
-    session->in(f);
+    // Note on channel states: a channel is open if session != 0.  A
+    // channel that is closed (session == 0) can be in the "ignoring"
+    // state. This is a temporary state after we have sent a channel
+    // exception, where extra frames might arrive that should be
+    // ignored.
+    // 
+    AMQMethodBody* m=f.getMethod();
+    try {
+        if (m && m->invoke(static_cast<Invocable*>(this)))
+            return;
+        else if (session)
+            session->in(f);
+        else if (!ignoring)
+            throw ChannelErrorException(
+                QPID_MSG("Channel " << channel << " is not open"));
+    } catch(const ChannelException& e){
+        getProxy().getChannel().close(
+            e.code, e.toString(), classId(m), methodId(m));
+        session.reset();
+        ignoring=true;          // Ignore trailing frames sent by client.
+    }catch(const ConnectionException& e){
+        connection.close(e.code, e.what(), classId(m), methodId(m));
+    }catch(const std::exception& e){
+        connection.close(
+            framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
+    }
+}
+
+void SessionAdapter::assertOpen(const char* method) {
+    if (!session)
+        throw ChannelErrorException(
+            QPID_MSG(""<<method<<" failed: No session for channel "
+                     << getChannel()));
+}
+
+void SessionAdapter::assertClosed(const char* method) {
+    // FIXME aconway 2007-08-31: Should raise channel-busy, need
+    // to update spec.
+    if (session)
+        throw PreconditionFailedException(
+            QPID_MSG(""<<method<<" failed: "
+                     << channel << " already open on channel "
+                     << getChannel()));
+}
+
+void SessionAdapter::open(const string& /*outOfBand*/){
+    assertClosed("open");
+    session.reset(new Session(*this, 0));
+    getProxy().getChannel().openOk();
+} 
+
+// FIXME aconway 2007-08-31: flow is no longer in the spec.
+void SessionAdapter::flow(bool active){
+    session->flow(active);
+    getProxy().getChannel().flowOk(active);
+}
+
+void SessionAdapter::flowOk(bool /*active*/){}
+        
+void SessionAdapter::close(uint16_t replyCode,
+                           const string& replyText,
+                           uint16_t classId, uint16_t methodId)
+{
+    // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids
+    // to text names.
+    QPID_LOG(warning, "Received session.close("<<replyCode<<","
+             <<replyText << ","
+             << "classid=" <<classId<< ","
+             << "methodid=" <<methodId);
+    ignoring=false;
+    getProxy().getChannel().closeOk();
+    // FIXME aconway 2007-08-31: sould reset session BEFORE
+    // sending closeOK to avoid races. SessionAdapter
+    // needs its own private proxy, see getProxy() above.
+    session.reset();
+    // No need to remove from connection map, will be re-used
+    // if channel is re-opened.
+} 
+        
+void SessionAdapter::closeOk(){
+    ignoring=false;
 }
 
+void SessionAdapter::ok() 
+{
+    //no specific action required, generic response handling should be
+    //sufficient
+}
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Fri Aug 31 13:51:22 2007
@@ -23,10 +23,15 @@
  */
 
 #include "qpid/framing/FrameHandler.h"
-#include "qpid/broker/Session.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/amqp_types.h"
 
 namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
 namespace broker {
 
 class Connection;
@@ -39,7 +44,10 @@
  *
  * SessionAdapters can be stored in a map by value.
  */
-class SessionAdapter : public framing::FrameHandler
+class SessionAdapter :
+        public framing::FrameHandler::Chains,
+        private framing::FrameHandler,
+        private framing::AMQP_ServerOperations::ChannelHandler
 {
   public:
     SessionAdapter(Connection&, framing::ChannelId);
@@ -56,11 +64,29 @@
     framing::ChannelId getChannel() const { return channel; }
     Connection& getConnection() { return connection; }
     const Connection& getConnection() const { return connection; }
-    
+
   private:
+    void assertOpen(const char* method);
+    void assertClosed(const char* method);
+
+    framing::AMQP_ClientProxy& getProxy();
+    
+    // FIXME aconway 2007-08-31: Replace channel commands with session.
+    void open(const std::string& outOfBand); 
+    void flow(bool active); 
+    void flowOk(bool active); 
+    void ok(  );
+    void ping(  );
+    void pong(  );
+    void resume( const std::string& channelId );
+    void close(uint16_t replyCode, const
+               std::string& replyText, uint16_t classId, uint16_t methodId); 
+    void closeOk(); 
+    
     Connection& connection;
     const framing::ChannelId channel;
     shared_ptr<Session> session;
+    bool ignoring;
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp Fri Aug 31 13:51:22 2007
@@ -25,7 +25,6 @@
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/broker/BrokerAdapter.h"
 #include "qpid/broker/Connection.h"
-#include "qpid/broker/BrokerChannel.h"
 #include "qpid/framing/ChannelAdapter.h"
 
 #include <boost/utility/in_place_factory.hpp>
@@ -38,13 +37,13 @@
 using namespace broker;
 
 /** Handler to send frames direct to local broker (bypass correlation etc.) */
-struct SessionManager::BrokerHandler :
-        public FrameHandler, private ChannelAdapter, private DeliveryAdapter
+struct SessionManager::BrokerHandler : public FrameHandler, private ChannelAdapter
 {
     Connection connection;
-    Channel channel;
+    SessionAdapter sessionAdapter;
+    broker::Session session;
     BrokerAdapter adapter;
-
+    
     // TODO aconway 2007-07-23: Lots of needless flab here (Channel,
     // Connection, ChannelAdapter) As these classes are untangled the
     // flab can be reduced. The real requirements are:
@@ -55,8 +54,9 @@
     // 
     BrokerHandler(Broker& broker) :
         connection(0, broker),
-        channel(connection, *this, 1),
-        adapter(channel, connection, broker, *this) {}
+        sessionAdapter(connection, 0),
+        session(sessionAdapter, 1),
+        adapter(session, static_cast<ChannelAdapter&>(*this)) {}
 
     void handle(AMQFrame& frame) {
         AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.getBody());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Fri Aug 31 13:51:22 2007
@@ -40,4 +40,7 @@
 void qpid::framing::AMQContentBody::print(std::ostream& out) const
 {
     out << "content (" << size() << " bytes)";
+#ifndef NDEBUG
+    out << data.substr(0,10);
+#endif
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Fri Aug 31 13:51:22 2007
@@ -52,7 +52,10 @@
     void setChannel(ChannelId c) { channel = c; }
 
     AMQBody* getBody();
-    const AMQBody* getBody() const;    
+    const AMQBody* getBody() const;
+
+    AMQMethodBody* getMethod() { return getBody()->getMethod(); }
+    const AMQMethodBody* getMethod() const { return getBody()->getMethod(); }
 
     /** Copy a body instance to the frame */
     void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp Fri Aug 31 13:51:22 2007
@@ -36,7 +36,7 @@
 
 ChannelAdapter::ChannelAdapter() : handler(*this), id(0) {}
 
-void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) 
+void ChannelAdapter::init(ChannelId i, FrameHandler& out, ProtocolVersion v) 
 {
     assertChannelNotOpen();
     id = i;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h Fri Aug 31 13:51:22 2007
@@ -29,13 +29,10 @@
 #include "ProtocolVersion.h"
 #include "amqp_types.h"
 #include "FrameHandler.h"
-#include "OutputHandler.h"
 
 namespace qpid {
 namespace framing {
 
-class OutputHandler;
-
 /**
  * Base class for client and broker channels.
  *
@@ -59,7 +56,7 @@
     virtual ~ChannelAdapter() {}
 
     /** Initialize the channel adapter. */
-    void init(ChannelId, OutputHandler&, ProtocolVersion);
+    void init(ChannelId, FrameHandler&, ProtocolVersion);
 
     FrameHandler::Chains& getHandlers() { return handlers; }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Fri Aug 31 13:51:22 2007
@@ -18,6 +18,11 @@
  * under the License.
  *
  */
+
+// FIXME aconway 2007-08-30: Rewrite as a Session test.
+// There is an issue with the tests use of DeliveryAdapter
+// which is no longer exposed on Session (part of SemanticHandler.)
+// 
 #include "qpid/broker/BrokerChannel.h"
 #include "qpid/broker/BrokerQueue.h"
 #include "qpid/broker/FanOutExchange.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=571575&r1=571574&r2=571575&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Aug 31 13:51:22 2007
@@ -28,8 +28,8 @@
 
 TESTS+=Blob
 check_PROGRAMS+=Blob
-Blob_SOURCES=Blob.cpp ../qpid/framing/Blob.cpp
-Blob_LDADD=-lboost_unit_test_framework
+Blob_SOURCES=Blob.cpp 
+Blob_LDADD=-lboost_unit_test_framework $(lib_common)
 
 TESTS+=logging
 check_PROGRAMS+=logging
@@ -80,7 +80,6 @@
 # Unit tests
 broker_unit_tests =	\
   AccumulatedAckTest	\
-  BrokerChannelTest 	\
   DtxWorkRecordTest     \
   ExchangeTest		\
   HeadersExchangeTest	\