You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/07/17 10:28:53 UTC

svn commit: r556846 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/framing/ tests/

Author: gsim
Date: Tue Jul 17 01:28:48 2007
New Revision: 556846

URL: http://svn.apache.org/viewvc?view=rev&rev=556846
Log:
Some refactoring towards a more decoupled handler chain structure:

* Connection no longer depends on Channel; it contains a map of
  FrameHandler::Chains. (The construction of the chains still refers
  to specific handlers).

* Channel is no longer tied to ChannelAdapter through inheritance. The
  former is independent of any particular handler chain or protocol
  version, the latter is still used by ConnectionAdapter and
  SemanticHandler in the 0-9 chain.

* A DeliveryAdapter interface has been introduced as part of the
  separation of ChannelAdapter from Channel. This is intended to adapt
  from a version independent core to version specific mechanisms for
  sending messages. i.e. it fulfills the same role for outputs that
  e.g. BrokerAdapter does for inputs. (Its not perfect yet by any
  means but is a step on the way to the correct model I think).

* The connection related methods sent over channel zero are
  implemented in their own adapter (ConnectionAdapter), and are
  entirely separate from the semantic layer. The channel control
  methods are still bundled with the proper semantic layer methods;
  they too can be separated but would have to share the request id
  with the semantic method handler due to the nature of the 0-9 WIP.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Jul 17 01:28:48 2007
@@ -197,6 +197,7 @@
   qpid/broker/Connection.cpp \
   qpid/broker/ConnectionAdapter.cpp \
   qpid/broker/ConnectionFactory.cpp \
+  qpid/broker/ConsumeAdapter.cpp \
   qpid/broker/Daemon.cpp \
   qpid/broker/DeliverableMessage.cpp \
   qpid/broker/DeliveryRecord.cpp \
@@ -209,6 +210,7 @@
   qpid/broker/DtxWorkRecord.cpp \
   qpid/broker/ExchangeRegistry.cpp \
   qpid/broker/FanOutExchange.cpp \
+  qpid/broker/GetAdapter.cpp \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/InMemoryContent.cpp \
   qpid/broker/LazyLoadedContent.cpp \
@@ -224,6 +226,7 @@
   qpid/broker/RecoveredEnqueue.cpp \
   qpid/broker/RecoveredDequeue.cpp \
   qpid/broker/Reference.cpp \
+  qpid/broker/SemanticHandler.cpp \
   qpid/broker/Timer.cpp \
   qpid/broker/TopicExchange.cpp \
   qpid/broker/TxAck.cpp \
@@ -253,9 +256,11 @@
   qpid/broker/BrokerMessageBase.h \
   qpid/broker/BrokerQueue.h \
   qpid/broker/CompletionHandler.h \
+  qpid/broker/ConsumeAdapter.h \
   qpid/broker/Consumer.h \
   qpid/broker/Deliverable.h \
   qpid/broker/DeliverableMessage.h \
+  qpid/broker/DeliveryAdapter.h \
   qpid/broker/DirectExchange.h \
   qpid/broker/DtxAck.h \
   qpid/broker/DtxBuffer.h \
@@ -265,6 +270,7 @@
   qpid/broker/DtxWorkRecord.h \
   qpid/broker/ExchangeRegistry.h \
   qpid/broker/FanOutExchange.h \
+  qpid/broker/GetAdapter.h \
   qpid/broker/HandlerImpl.h \
   qpid/broker/InMemoryContent.h \
   qpid/broker/MessageBuilder.h \
@@ -306,6 +312,7 @@
   qpid/broker/PersistableQueue.h \
   qpid/broker/QueuePolicy.h \
   qpid/broker/RecoveryManagerImpl.h \
+  qpid/broker/SemanticHandler.h \
   qpid/broker/Timer.h \
   qpid/broker/TopicExchange.h \
   qpid/broker/TransactionalStore.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Tue Jul 17 01:28:48 2007
@@ -20,6 +20,8 @@
 #include "BrokerAdapter.h"
 #include "BrokerChannel.h"
 #include "Connection.h"
+#include "ConsumeAdapter.h"
+#include "GetAdapter.h"
 #include "qpid/framing/AMQMethodBody.h"
 #include "qpid/Exception.h"
 
@@ -33,8 +35,8 @@
 typedef std::vector<Queue::shared_ptr> QueueVector;
 
 
-BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
-    CoreRefs(ch, c, b),
+    BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) :
+    CoreRefs(ch, c, b, a),
     connection(c),
     basicHandler(*this),
     channelHandler(*this),
@@ -299,9 +301,11 @@
     if(!consumerTag.empty() && channel.exists(consumerTag)){
         throw ConnectionException(530, "Consumer tags must be unique");
     }
-
     string newTag = consumerTag;
-    channel.consume(
+    //need to generate name here, so we have it for the adapter (it is
+    //also version specific behaviour now)
+    if (newTag.empty()) newTag = tagGenerator.generate();
+    channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
         newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
 
     if(!nowait) client.consumeOk(newTag, context.getRequestId());
@@ -336,7 +340,8 @@
         
 void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
     Queue::shared_ptr queue = getQueue(queueName);    
-    if(!channel.get(queue, "", !noAck)){
+    GetAdapter out(adapter, queue, "", connection.getFrameMax());
+    if(!channel.get(out, queue, !noAck)){
         string clusterId;//not used, part of an imatix hack
 
         client.getEmpty(clusterId, context.getRequestId());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Tue Jul 17 01:28:48 2007
@@ -56,7 +56,7 @@
 class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
 {
   public:
-    BrokerAdapter(Channel& ch, Connection& c, Broker& b);
+    BrokerAdapter(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a);
 
     framing::ProtocolVersion getVersion() const;
     ChannelHandler* getChannelHandler() { return &channelHandler; }
@@ -172,8 +172,10 @@
         public BasicHandler,
         public HandlerImpl<framing::AMQP_ClientProxy::Basic>
     {
+        NameGenerator tagGenerator;
+
       public:
-        BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+        BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {}
 
         void qos(const framing::MethodContext& context, uint32_t prefetchSize,
                  uint16_t prefetchCount, bool global); 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Tue Jul 17 01:28:48 2007
@@ -28,7 +28,6 @@
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
-#include "qpid/framing/ChannelAdapter.h"
 #include "qpid/QpidError.h"
 
 #include "BrokerAdapter.h"
@@ -50,8 +49,8 @@
 using namespace qpid::sys;
 
 
-Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
-    ChannelAdapter(),
+Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) :
+    id(_id),
     connection(con),
     currentDeliveryTag(1),
     prefetchSize(0),
@@ -62,10 +61,8 @@
     store(_store),
     messageBuilder(this, _store, connection.getStagingThreshold()),
     opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
-    flowActive(true),
-    adapter(new BrokerAdapter(*this, con, con.broker))
+    flowActive(true)
 {
-    init(id, con.getOutput(), con.getVersion());
     outstanding.reset();
 }
 
@@ -79,14 +76,15 @@
 
 // TODO aconway 2007-02-12: Why is connection token passed in instead
 // of using the channel's parent connection?
-void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, 
+                      Queue::shared_ptr queue, bool acks,
                       bool exclusive, ConnectionToken* const connection,
                       const FieldTable*)
 {
     if(tagInOut.empty())
         tagInOut = tagGenerator.generate();
     std::auto_ptr<ConsumerImpl> c(
-        new ConsumerImpl(this, tagInOut, queue, connection, acks));
+    new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks));
     queue->consume(c.get(), exclusive);//may throw exception
     consumers.insert(tagInOut, c.release());
 }
@@ -195,22 +193,10 @@
     }
 }
 
-void Channel::deliver(
-    Message::shared_ptr& msg, const string& consumerTag,
-    Queue::shared_ptr& queue, bool ackExpected)
+void Channel::record(const DeliveryRecord& delivery)
 {
-    Mutex::ScopedLock locker(deliveryLock);
-
-	// Key the delivered messages to the id of the request in which they're sent 
-    uint64_t deliveryTag = getNextSendRequestId();
-    
-    if(ackExpected){
-        unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
-        outstanding.size += msg->contentSize();
-        outstanding.count++;
-    }
-    //send deliver method, header and content(s)
-    msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
+    unacked.push_back(delivery);
+    delivery.addTo(&outstanding);
 }
 
 bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -220,11 +206,11 @@
     return countOk && sizeOk;
 }
 
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, 
-    Queue::shared_ptr _queue, 
-    ConnectionToken* const _connection, bool ack
-) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
-    ackExpected(ack), blocked(false) {}
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter, 
+                                    const string& _tag, Queue::shared_ptr _queue, 
+                                    ConnectionToken* const _connection, bool ack
+                                    ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection),
+                                        ackExpected(ack), blocked(false) {}
 
 bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
     if(!connection || connection != msg->getPublisher()){//check for no_local
@@ -232,13 +218,25 @@
             blocked = true;
         }else{
             blocked = false;
-            parent->deliver(msg, tag, queue, ackExpected);
+            Mutex::ScopedLock locker(parent->deliveryLock);
+
+            uint64_t deliveryTag = adapter->getNextDeliveryTag();    
+            if(ackExpected){
+                parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
+            }
+            adapter->deliver(msg, deliveryTag);
+
             return true;
         }
     }
     return false;
 }
 
+void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
+    Mutex::ScopedLock locker(parent->deliveryLock);
+    adapter->deliver(msg, deliveryTag);
+}
+
 Channel::ConsumerImpl::~ConsumerImpl() {
     cancel();
 }
@@ -298,10 +296,6 @@
     }
 }
 
-void Channel::ack(){
-    ack(getFirstAckRequest(), getLastAckRequest());
-}
-
 // Used by Basic
 void Channel::ack(uint64_t deliveryTag, bool multiple){
     if (multiple)
@@ -365,15 +359,12 @@
     }
 }
 
-bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
+bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){
     Message::shared_ptr msg = queue->dequeue();
     if(msg){
         Mutex::ScopedLock locker(deliveryLock);
-        uint64_t myDeliveryTag = getNextSendRequestId();
-        msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
-        			   destination,
-                       queue->getMessageCount() + 1, myDeliveryTag,
-                       connection.getFrameMax());
+        uint64_t myDeliveryTag = adapter.getNextDeliveryTag();
+        adapter.deliver(msg, myDeliveryTag);
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }
@@ -386,33 +377,9 @@
 void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
                       uint64_t deliveryTag)
 {
-    msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
-}
-
-void Channel::handleMethodInContext(
-    boost::shared_ptr<qpid::framing::AMQMethodBody> method,
-    const MethodContext& context
-)
-{
-    try{
-        if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
-            if (!method->isA<ChannelCloseOkBody>()) {
-                std::stringstream out;
-                out << "Attempt to use unopened channel: " << getId();
-                throw ConnectionException(504, out.str());
-            }
-        } else {
-            method->invoke(*adapter, context);
-        }
-    }catch(ChannelException& e){
-        adapter->getProxy().getChannel().close(
-            e.code, e.toString(),
-            method->amqpClassId(), method->amqpMethodId());
-        connection.closeChannel(getId());
-    }catch(ConnectionException& e){
-        connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
-    }catch(std::exception& e){
-        connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+    ConsumerImplMap::iterator i = consumers.find(consumerTag);
+    if (i != consumers.end()){
+        i->redeliver(msg, deliveryTag);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Tue Jul 17 01:28:48 2007
@@ -23,6 +23,7 @@
  */
 
 #include <list>
+#include <memory>
 
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
@@ -30,6 +31,7 @@
 
 #include "AccumulatedAck.h"
 #include "Consumer.h"
+#include "DeliveryAdapter.h"
 #include "DeliveryRecord.h"
 #include "DtxBuffer.h"
 #include "DtxManager.h"
@@ -37,6 +39,7 @@
 #include "NameGenerator.h"
 #include "Prefetch.h"
 #include "TxBuffer.h"
+#include "qpid/framing/amqp_types.h"
 #include "qpid/framing/ChannelAdapter.h"
 #include "qpid/framing/ChannelOpenBody.h"
 #include "CompletionHandler.h"
@@ -55,12 +58,12 @@
  * Maintains state for an AMQP channel. Handles incoming and
  * outgoing messages for that channel.
  */
-class Channel : public framing::ChannelAdapter,
-                public CompletionHandler
+class Channel : public CompletionHandler
 {
     class ConsumerImpl : public Consumer
     {
         Channel* parent;
+        std::auto_ptr<DeliveryAdapter> adapter;
         const string tag;
         Queue::shared_ptr queue;
         ConnectionToken* const connection;
@@ -68,17 +71,19 @@
         bool blocked;
 
       public:
-        ConsumerImpl(Channel* parent, const string& tag,
-                     Queue::shared_ptr queue,
+        ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter, 
+                     const string& tag, Queue::shared_ptr queue,
                      ConnectionToken* const connection, bool ack);
         ~ConsumerImpl();
-        virtual bool deliver(Message::shared_ptr& msg);            
+        bool deliver(Message::shared_ptr& msg);            
+        void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag);
         void cancel();
         void requestDispatch();
     };
 
     typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
 
+    framing::ChannelId id;
     Connection& connection;
     uint64_t currentDeliveryTag;
     Queue::shared_ptr defaultQueue;
@@ -97,15 +102,10 @@
     MessageBuilder messageBuilder;//builder for in-progress message
     bool opened;
     bool flowActive;
-    boost::scoped_ptr<BrokerAdapter> adapter;
-
-	// completion handler for MessageBuilder
-    void complete(Message::shared_ptr msg);
-    
-    void deliver(Message::shared_ptr& msg, const string& tag,
-                 Queue::shared_ptr& queue, bool ackExpected);            
+	
+    void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
+    void record(const DeliveryRecord& delivery);
     bool checkPrefetch(Message::shared_ptr& msg);
-
     void checkDtxTimeout();
         
   public:
@@ -113,7 +113,7 @@
     ~Channel();
 
     bool isOpen() const { return opened; }
-    BrokerAdapter& getAdapter() { return *adapter; }
+    framing::ChannelId getId() const { return id; }
     
     void open() { opened = true; }
     void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -126,11 +126,11 @@
     /**
      *@param tagInOut - if empty it is updated with the generated token.
      */
-    void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+    void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks,
                  bool exclusive, ConnectionToken* const connection = 0,
                  const framing::FieldTable* = 0);
     void cancel(const string& tag);
-    bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
+    bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected);
     void close();
     void startTx();
     void commit();
@@ -140,7 +140,6 @@
     void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
-    void ack();
     void ack(uint64_t deliveryTag, bool multiple);
     void ack(uint64_t deliveryTag, uint64_t endTag);
     void recover(bool requeue);
@@ -152,11 +151,6 @@
     void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
     
     void handleInlineTransfer(Message::shared_ptr msg);
-    
-    // For ChannelAdapter
-    void handleMethodInContext(
-        boost::shared_ptr<framing::AMQMethodBody> method,
-        const framing::MethodContext& context);
 };
 
 }} // namespace broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h Tue Jul 17 01:28:48 2007
@@ -103,7 +103,7 @@
      * Used to return a message in response to a get from a queue
      */
     virtual void sendGetOk(const framing::MethodContext& context,
-    					   const std::string& destination,
+                           const std::string& destination,
                            uint32_t messageCount,
                            uint64_t deliveryTag, 
                            uint32_t framesize) = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jul 17 01:28:48 2007
@@ -26,6 +26,7 @@
 #include "BrokerChannel.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "BrokerAdapter.h"
+#include "SemanticHandler.h"
 
 using namespace boost;
 using namespace qpid::sys;
@@ -55,7 +56,7 @@
     if (frame.getChannel() == 0) {
         adapter.handle(frame);
     } else {
-        getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+        getChannel((frame.getChannel())).in->handle(frame);
     }
 }
 
@@ -92,17 +93,17 @@
 
 void Connection::closeChannel(uint16_t id) {
     ChannelMap::iterator i = channels.find(id);
-    if (i != channels.end())
-        i->close();
+    if (i != channels.end()) channels.erase(i);
 }
 
 
-Channel& Connection::getChannel(ChannelId id) {
+FrameHandler::Chains& Connection::getChannel(ChannelId id) {
     ChannelMap::iterator i = channels.find(id);
     if (i == channels.end()) {
-        i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first;
+        FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out));
+        i = channels.insert(ChannelMap::value_type(id, chains)).first;
     }        
-    return *i;
+    return i->second;
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jul 17 01:28:48 2007
@@ -51,7 +51,7 @@
     Connection(sys::ConnectionOutputHandler* out, Broker& broker);
 
     /** Get a channel. Create if it does not already exist */
-    Channel& getChannel(framing::ChannelId channel);
+    framing::FrameHandler::Chains& getChannel(framing::ChannelId channel);
 
     /** Close a channel */
     void closeChannel(framing::ChannelId channel);
@@ -82,7 +82,7 @@
     void closed();
 
   private:
-    typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
+    typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
 
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
     Exchange::shared_ptr findExchange(const string& name);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp?view=auto&rev=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp Tue Jul 17 01:28:48 2007
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConsumeAdapter.h"
+
+using namespace qpid::broker;
+using qpid::framing::ChannelAdapter;
+using qpid::framing::RequestId;
+
+ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {}
+
+RequestId ConsumeAdapter::getNextDeliveryTag()
+{
+    return adapter.getNextSendRequestId();
+}
+
+void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag)
+{
+    msg->deliver(adapter, tag, deliveryTag, framesize);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.h?view=auto&rev=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.h Tue Jul 17 01:28:48 2007
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConsumeAdapter_
+#define _ConsumeAdapter_
+
+#include "DeliveryAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+namespace qpid {
+namespace broker {
+    class ConsumeAdapter : public DeliveryAdapter
+    {
+        framing::ChannelAdapter& adapter;
+        const std::string tag;
+        const uint32_t framesize;
+    public:
+        ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize);
+        framing::RequestId getNextDeliveryTag();
+        void deliver(Message::shared_ptr& msg, framing::RequestId tag);
+    };
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConsumeAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h?view=auto&rev=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h Tue Jul 17 01:28:48 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliveryAdapter_
+#define _DeliveryAdapter_
+
+#include "BrokerMessageBase.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+    /**
+     * The intention behind this interface is to separate the generic
+     * handling of some form of message delivery to clients that is
+     * contained in the version independent Channel class from the
+     * details required for a particular situation or
+     * version. i.e. where the existing adapters allow (through
+     * supporting the generated interface for a version of the
+     * protocol) inputs of a channel to be adapted to the version
+     * independent part, this does the same for the outputs.
+     */
+    class DeliveryAdapter
+    {
+    public:
+        virtual framing::RequestId getNextDeliveryTag() = 0;
+        virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0;
+        virtual ~DeliveryAdapter(){}
+    };
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.cpp?view=auto&rev=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.cpp Tue Jul 17 01:28:48 2007
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "GetAdapter.h"
+#include "qpid/framing/MethodContext.h"
+
+using namespace qpid::broker;
+using qpid::framing::ChannelAdapter;
+using qpid::framing::RequestId;
+using qpid::framing::MethodContext;
+
+GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f) 
+    : adapter(a), queue(q), destination(d), framesize(f) {}
+
+RequestId GetAdapter::getNextDeliveryTag()
+{
+    return adapter.getNextSendRequestId();
+}
+
+void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag)
+{
+    msg->sendGetOk(MethodContext(&adapter, msg->getRespondTo()), destination,
+                   queue->getMessageCount(), deliveryTag, framesize);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.h?view=auto&rev=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.h Tue Jul 17 01:28:48 2007
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _GetAdapter_
+#define _GetAdapter_
+
+#include "BrokerQueue.h"
+#include "DeliveryAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+namespace qpid {
+namespace broker {
+
+    class GetAdapter : public DeliveryAdapter
+    {
+        framing::ChannelAdapter& adapter;
+        Queue::shared_ptr queue;
+        const std::string destination;
+        const uint32_t framesize;
+    public:
+        GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize);
+        ~GetAdapter(){}
+        framing::RequestId getNextDeliveryTag();
+        void deliver(Message::shared_ptr& msg, framing::RequestId tag);
+    };
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/GetAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Tue Jul 17 01:28:48 2007
@@ -40,12 +40,13 @@
  */
 struct CoreRefs
 {
-    CoreRefs(Channel& ch, Connection& c, Broker& b)
-        : channel(ch), connection(c), broker(b), proxy(ch) {}
+    CoreRefs(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
+        : channel(ch), connection(c), broker(b), adapter(a), proxy(a) {}
 
     Channel& channel;
     Connection& connection;
     Broker& broker;
+    framing::ChannelAdapter& adapter;
     framing::AMQP_ClientProxy proxy;
 
     /**

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Tue Jul 17 01:28:48 2007
@@ -21,6 +21,8 @@
 #include "BrokerChannel.h"
 #include "qpid/framing/FramingContent.h"
 #include "Connection.h"
+#include "ConsumeAdapter.h"
+#include "GetAdapter.h"
 #include "Broker.h"
 #include "BrokerMessageMessage.h"
 #include "qpid/framing/MessageAppendBody.h"
@@ -127,7 +129,7 @@
     if(!destination.empty() && channel.exists(destination))
         throw ConnectionException(530, "Consumer tags must be unique");
     string tag = destination;
-    channel.consume(
+    channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
         tag, queue, !noAck, exclusive,
         noLocal ? &connection : 0, &filter);
     client.ok(context.getRequestId());
@@ -144,7 +146,8 @@
 {
     Queue::shared_ptr queue = getQueue(queueName);
     
-    if(channel.get(queue, destination, !noAck))
+    GetAdapter out(adapter, queue, destination, connection.getFrameMax());
+    if(channel.get(out, queue, !noAck))
         client.ok(context.getRequestId());
     else 
         client.empty(context.getRequestId());
@@ -162,7 +165,7 @@
 void
 MessageHandlerImpl::ok(const MethodContext& /*context*/)
 {
-    channel.ack();
+    channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
 }
 
 void
@@ -190,7 +193,7 @@
                            uint16_t /*code*/,
                            const string& /*text*/ )
 {
-    channel.ack();
+    //channel.ack();
     // channel.requeue();
 }
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?view=auto&rev=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Tue Jul 17 01:28:48 2007
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SemanticHandler.h"
+#include "BrokerAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : 
+    connection(c),
+    channel(c, id, &c.broker.getStore())
+{
+    init(id, connection.getOutput(), connection.getVersion());
+    adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
+}
+
+
+void SemanticHandler::handle(framing::AMQFrame& frame) 
+{
+    handleBody(frame.getBody());
+}
+
+//ChannelAdapter virtual methods:
+void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, 
+                                            const qpid::framing::MethodContext& context)
+{
+    try{
+        if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
+            if (!method->isA<ChannelCloseOkBody>()) {
+                std::stringstream out;
+                out << "Attempt to use unopened channel: " << getId();
+                throw ConnectionException(504, out.str());
+            }
+        } else {
+            method->invoke(*adapter, context);
+        }
+    }catch(ChannelException& e){
+        adapter->getProxy().getChannel().close(
+            e.code, e.toString(),
+            method->amqpClassId(), method->amqpMethodId());
+        connection.closeChannel(getId());
+    }catch(ConnectionException& e){
+        connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+    }catch(std::exception& e){
+        connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+    }
+
+}
+
+bool SemanticHandler::isOpen() const 
+{ 
+    return channel.isOpen(); 
+}
+
+void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body) 
+{
+    channel.handleHeader(body);
+}
+
+void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body) 
+{
+    channel.handleContent(body);
+}
+
+void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body) 
+{
+    channel.handleHeartbeat(body);
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?view=auto&rev=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Tue Jul 17 01:28:48 2007
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SemanticHandler_
+#define _SemanticHandler_
+
+#include <memory>
+#include "BrokerChannel.h"
+#include "Connection.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/FrameHandler.h"
+
+namespace qpid {
+namespace broker {
+
+class BrokerAdapter;
+class framing::ChannelAdapter;
+
+class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler {
+    Connection& connection;
+    Channel channel;
+    std::auto_ptr<BrokerAdapter> adapter;
+
+    //ChannelAdapter virtual methods:
+    void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, 
+                               const qpid::framing::MethodContext& context);
+    bool isOpen() const;
+    void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+    void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+    void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+public:
+    SemanticHandler(framing::ChannelId id, Connection& c);
+    void handle(framing::AMQFrame& frame);
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h Tue Jul 17 01:28:48 2007
@@ -52,7 +52,7 @@
  * Thread safety: OBJECT UNSAFE. Instances must not be called
  * concurrently. AMQP defines channels to be serialized.
  */
-class ChannelAdapter : private BodyHandler {
+class ChannelAdapter : protected BodyHandler {
   public:
     /**
      *@param output Processed frames are forwarded to this handler.
@@ -84,6 +84,10 @@
 
     virtual bool isOpen() const = 0;
     
+    RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
+    RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
+    RequestId getNextSendRequestId() { return requester.getNextId(); }
+
   protected:
     void assertMethodOk(AMQMethodBody& method) const;
     void assertChannelOpen() const;
@@ -93,13 +97,9 @@
         shared_ptr<AMQMethodBody> method,
         const MethodContext& context) = 0;
 
-    RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
-    RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
-    RequestId getNextSendRequestId() { return requester.getNextId(); }
-
   private:
     class ChannelAdapterHandler;
-  friend class ChannelAdapterHandler;
+    friend class ChannelAdapterHandler;
     
     void handleMethod(shared_ptr<AMQMethodBody>);
     void handleRequest(shared_ptr<AMQRequestBody>);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?view=diff&rev=556846&r1=556845&r2=556846
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Tue Jul 17 01:28:48 2007
@@ -48,13 +48,38 @@
     void close() {};
 };
 
+struct DeliveryRecorder
+{
+    typedef std::pair<Message::shared_ptr, RequestId> Delivery;
+    std::vector<Delivery> delivered;
+
+    struct Adapter : DeliveryAdapter 
+    {
+        RequestId id;
+        DeliveryRecorder& recorder;
+
+        Adapter(DeliveryRecorder& r) : recorder(r) {}
+
+        RequestId getNextDeliveryTag() { return id + 1; }
+        void deliver(Message::shared_ptr& msg, RequestId tag) 
+        {
+            recorder.delivered.push_back(Delivery(msg, tag));
+            id++; 
+        }
+
+    };
+
+    std::auto_ptr<DeliveryAdapter> createAdapter()
+    {
+        return std::auto_ptr<DeliveryAdapter>(new Adapter(*this));
+    }
+};
 
 class BrokerChannelTest : public CppUnit::TestCase  
 {
     CPPUNIT_TEST_SUITE(BrokerChannelTest);
-    CPPUNIT_TEST(testConsumerMgmt);
+    CPPUNIT_TEST(testConsumerMgmt);;
     CPPUNIT_TEST(testDeliveryNoAck);
-    CPPUNIT_TEST(testDeliveryAndRecovery);
     CPPUNIT_TEST(testStaging);
     CPPUNIT_TEST(testQueuePolicy);
     CPPUNIT_TEST(testFlow);
@@ -160,11 +185,12 @@
 
         ConnectionToken* owner = 0;
         string tag("my_consumer");
-        channel.consume(tag, queue, false, false, owner);
+        std::auto_ptr<DeliveryAdapter> unused;
+        channel.consume(unused, tag, queue, false, false, owner);
         string tagA;
         string tagB;
-        channel.consume(tagA, queue, false, false, owner);
-        channel.consume(tagB, queue, false, false, owner);
+        channel.consume(unused, tagA, queue, false, false, owner);
+        channel.consume(unused, tagB, queue, false, false, owner);
         CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount());
         CPPUNIT_ASSERT(channel.exists("my_consumer"));
         CPPUNIT_ASSERT(channel.exists(tagA));
@@ -178,65 +204,17 @@
         CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount());        
     }
 
-    void testDeliveryNoAck(){
-        Channel channel(connection, 7);
-        channel.open();
-        const string data("abcdefghijklmn");
-        Message::shared_ptr msg(
-            createMessage("test", "my_routing_key", "my_message_id", 14));
-        addContent(msg, data);
-        Queue::shared_ptr queue(new Queue("my_queue"));
-        ConnectionToken* owner(0);
-        string tag("no_ack");
-        channel.consume(tag, queue, false, false, owner);
-
-        queue->deliver(msg);
-        CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
-        CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
-        CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel());
-        CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel());
-        CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel());
-        CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
-                           handler.frames[0].getBody().get()));
-        CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
-                           handler.frames[1].getBody().get()));
-        CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
-                           handler.frames[2].getBody().get()));
-        AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
-            handler.frames[3].getBody().get());
-        CPPUNIT_ASSERT(contentBody);
-        CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
-    }
-
-    void testDeliveryAndRecovery(){
+    void testDeliveryNoAck(){        
         Channel channel(connection, 7);
-        channel.open();
-        const string data("abcdefghijklmn");
-
         Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
-        addContent(msg, data);
-
         Queue::shared_ptr queue(new Queue("my_queue"));
-        ConnectionToken* owner(0);
-        string tag("ack");
-        channel.consume(tag, queue, true, false, owner);
-
+        DeliveryRecorder recorder;
+        string tag("test");
+        channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
         queue->deliver(msg);
-        CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
-        CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
-        CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel());
-        CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel());
-        CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel());
-        CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
-                           handler.frames[0].getBody().get()));
-        CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
-                           handler.frames[1].getBody().get()));
-        CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
-                           handler.frames[2].getBody().get()));
-        AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
-            handler.frames[3].getBody().get());
-        CPPUNIT_ASSERT(contentBody);
-        CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
+        CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
     }
 
     void testStaging(){
@@ -349,26 +327,18 @@
         Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
         addContent(msg, data);
         Queue::shared_ptr queue(new Queue("my_queue"));
-        ConnectionToken* owner(0);
-        string tag("no_ack");
-        channel.consume(tag, queue, false, false, owner);
+        DeliveryRecorder recorder;
+        string tag("test");
+        channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
         channel.flow(false);
         queue->deliver(msg);
-        //ensure no more frames have been delivered
-        CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
-        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());        
+        //ensure no messages have been delivered
+        CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
+
         channel.flow(true);
-        CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
-        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel());        
-        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel());        
-        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel());
-        BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody()));
-        AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody()));
-        AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody()));
-        CPPUNIT_ASSERT(deliver);
-        CPPUNIT_ASSERT(contentHeader);
-        CPPUNIT_ASSERT(contentBody);
-        CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+        //ensure no messages have been delivered
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
+        CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
     }
 
     Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)