You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/06 22:27:36 UTC

svn commit: r573359 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/ src/ src/qpid/broker/ src/qpid/client/ src/qpid/framing/ src/tests/

Author: gsim
Date: Thu Sep  6 13:27:33 2007
New Revision: 573359

URL: http://svn.apache.org/viewvc?rev=573359&view=rev
Log:
Implementation of execution.result on the client side 


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
      - copied, changed from r572247, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h
      - copied, changed from r572394, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
    incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Thu Sep  6 13:27:33 2007
@@ -12,8 +12,18 @@
     @classname="Session"
   end
   
+  def return_type(m)
+    if (m.result)
+      return "TypedResult<qpid::framing::#{m.result.cpptype.ret}>"
+    elsif (not m.responses().empty?)
+      return "Response"
+    else
+      return "Completion"
+    end 
+  end
+
   def declare_method (m)
-    gen "Response #{m.parent.name.lcaps}#{m.name.caps}(" 
+    gen "#{return_type(m)} #{m.parent.name.lcaps}#{m.name.caps}(" 
     if (m.content())
       params=m.signature + ["const MethodContent& content"]
     else
@@ -28,7 +38,7 @@
   end
 
   def define_method (m)
-    gen "Response Session::#{m.parent.name.lcaps}#{m.name.caps}(" 
+    gen "#{return_type(m)} Session::#{m.parent.name.lcaps}#{m.name.caps}(" 
     if (m.content())
       params=m.signature + ["const MethodContent& content"]
     else
@@ -37,19 +47,15 @@
     indent { gen params.join(",\n") }
     gen "){\n\n"
     indent (2) { 
-      gen "return impl->send(#{m.body_name}(" 
+      gen "return #{return_type(m)}(impl()->send(#{m.body_name}(" 
       params = ["version"] + m.param_names
       gen params.join(", ")
       other_params=[]
       if (m.content())
-        other_params << "content"
-      end
-      if m.responses().empty?
-        other_params << "false"
-      else 
-        other_params << "true"
+        gen "), content), impl());\n"
+      else
+        gen ")), impl());\n"
       end
-      gen "), #{other_params.join(", ")});\n"
     }
     gen "}\n\n"
   end
@@ -65,11 +71,13 @@
       gen <<EOS
 #include <sstream> 
 #include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/amqp_structs.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/MethodContent.h"
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/Response.h"
-#include "qpid/client/SessionCore.h"
+#include "qpid/client/ScopedAssociation.h"
+#include "qpid/client/TypedResult.h"
 
 namespace qpid {
 namespace client {
@@ -81,16 +89,20 @@
 using framing::SequenceNumberSet;
 
 class #{@classname} {
-  ConnectionImpl::shared_ptr parent;
-  SessionCore::shared_ptr impl;
+  ScopedAssociation::shared_ptr assoc;
   framing::ProtocolVersion version;
+  
+  SessionCore::shared_ptr impl();
+
 public:
-    #{@classname}(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
-    ~#{@classname}();
+    #{@classname}();
+    #{@classname}(ScopedAssociation::shared_ptr);
 
-    framing::FrameSet::shared_ptr get() { return impl->get(); }
-    void setSynchronous(bool sync) { impl->setSync(sync); } 
+    framing::FrameSet::shared_ptr get() { return impl()->get(); }
+    void setSynchronous(bool sync) { impl()->setSync(sync); } 
     void close();
+    Execution& execution() { return impl()->getExecution(); }
+
 EOS
   indent { @amqp.classes.each { |c| declare_class(c) if !excludes.include?(c.name) } }
   gen <<EOS
@@ -112,24 +124,18 @@
 namespace qpid {
 namespace client {
 
-#{@classname}::#{@classname}(ConnectionImpl::shared_ptr _parent, SessionCore::shared_ptr _impl) : parent(_parent), impl(_impl) {}
+#{@classname}::#{@classname}() {}
+#{@classname}::#{@classname}(ScopedAssociation::shared_ptr _assoc) : assoc(_assoc) {}
 
-#{@classname}::~#{@classname}()
+SessionCore::shared_ptr #{@classname}::impl()
 {
-    impl->stop();
-    if (parent) { 
-        parent->released(impl);
-        parent.reset();
-    }
+    if (!assoc) throw Exception("Uninitialised session");
+    return assoc->session;
 }
 
 void #{@classname}::close()
 {
-    impl->close(); 
-    if (parent) { 
-        parent->released(impl);
-        parent.reset();
-    }
+    impl()->close(); 
 }
 
 EOS

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb Thu Sep  6 13:27:33 2007
@@ -117,19 +117,15 @@
   end
 
   def methodbody_extra_defs(s)
-    if (s.content)
-      content = "true"
-    else 
-      content = "false"
-    end
-
     gen <<EOS
     using  AMQMethodBody::accept;
     void accept(MethodBodyConstVisitor& v) const { v.visit(*this); }
 
     inline ClassId amqpClassId() const { return CLASS_ID; }
     inline MethodId amqpMethodId() const { return METHOD_ID; }
-    inline bool isContentBearing() const { return  #{content}; }
+    inline bool isContentBearing() const { return  #{s.content ? "true" : "false" }; }
+    inline bool resultExpected() const { return  #{s.result ? "true" : "false"}; }
+    inline bool responseExpected() const { return  #{s.responses().empty? ? "false" : "true"}; }
 EOS
   end
 
@@ -201,7 +197,7 @@
         #as result structs have types that are only unique to the
         #class, they have a class dependent qualifier added to them
         #(this is inline with current python code but a formal
-        #solution is expected from the WG
+        #solution is expected from the WG)
         indent { genl "static const uint16_t TYPE = #{s.type_} + #{s.parent.parent.parent.index} * 256;" }
       else
         indent { genl "static const uint16_t TYPE = #{s.type_};" } 

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Sep  6 13:27:33 2007
@@ -98,6 +98,7 @@
 libqpidcommon_la_SOURCES = \
   $(rgen_common_cpp) \
   $(platform_src) \
+  qpid/framing/AccumulatedAck.cpp \
   qpid/framing/AMQBody.cpp \
   qpid/framing/AMQMethodBody.cpp \
   qpid/framing/AMQContentBody.cpp \
@@ -155,7 +156,6 @@
 
 libqpidbroker_la_LIBADD = libqpidcommon.la -lboost_iostreams
 libqpidbroker_la_SOURCES = \
-  qpid/broker/AccumulatedAck.cpp \
   qpid/broker/Broker.cpp \
   qpid/broker/BrokerAdapter.cpp \
   qpid/broker/BrokerSingleton.cpp \
@@ -218,14 +218,13 @@
   qpid/client/ExecutionHandler.cpp		\
   qpid/client/FutureCompletion.cpp		\
   qpid/client/FutureResponse.cpp		\
-  qpid/client/FutureFactory.cpp			\
+  qpid/client/FutureResult.cpp			\
   qpid/client/SessionCore.cpp			\
   qpid/client/StateManager.cpp
 
 
 nobase_include_HEADERS = \
   $(platform_hdr) \
-  qpid/broker/AccumulatedAck.h \
   qpid/broker/BrokerExchange.h \
   qpid/broker/BrokerQueue.h \
   qpid/broker/Consumer.h \
@@ -295,6 +294,7 @@
   qpid/client/Connection.h \
   qpid/client/ConnectionImpl.h \
   qpid/client/Connector.h \
+  qpid/client/Completion.h \
   qpid/client/MessageListener.h \
   qpid/client/BlockingQueue.h \
   qpid/client/Correlator.h \
@@ -302,13 +302,18 @@
   qpid/client/ChannelHandler.h \
   qpid/client/ChainableFrameHandler.h	\
   qpid/client/ConnectionHandler.h \
+  qpid/client/Execution.h \
   qpid/client/ExecutionHandler.h \
+  qpid/client/Future.h \
   qpid/client/FutureCompletion.h \
   qpid/client/FutureResponse.h \
-  qpid/client/FutureFactory.h \
+  qpid/client/FutureResult.h \
   qpid/client/Response.h \
+  qpid/client/ScopedAssociation.h \
   qpid/client/SessionCore.h \
   qpid/client/StateManager.h \
+  qpid/client/TypedResult.h \
+  qpid/framing/AccumulatedAck.h \
   qpid/framing/AMQBody.h \
   qpid/framing/AMQContentBody.h \
   qpid/framing/AMQDataBlock.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Thu Sep  6 13:27:33 2007
@@ -171,7 +171,6 @@
                             queue->getSettings(),
                             queue->getMessageCount(),
                             queue->getConsumerCount());
-
 }
 
 void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Thu Sep  6 13:27:33 2007
@@ -65,7 +65,7 @@
     return id > tag;
 }
 
-bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{
     return range->covers(id);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu Sep  6 13:27:33 2007
@@ -25,7 +25,7 @@
 #include <list>
 #include <vector>
 #include <ostream>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
 #include "BrokerQueue.h"
 #include "Consumer.h"
 #include "DeliveryId.h"
@@ -56,7 +56,7 @@
     bool matches(DeliveryId tag) const;
     bool matchOrAfter(DeliveryId tag) const;
     bool after(DeliveryId tag) const;
-    bool coveredBy(const AccumulatedAck* const range) const;
+    bool coveredBy(const framing::AccumulatedAck* const range) const;
     void requeue() const;
     void release();
     void reject();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Thu Sep  6 13:27:33 2007
@@ -26,7 +26,7 @@
 using std::mem_fun_ref;
 using namespace qpid::broker;
 
-DtxAck::DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
 {
     remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), 
                    not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Thu Sep  6 13:27:33 2007
@@ -24,7 +24,7 @@
 #include <algorithm>
 #include <functional>
 #include <list>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
 #include "DeliveryRecord.h"
 #include "TxOp.h"
 
@@ -34,7 +34,7 @@
             std::list<DeliveryRecord> pending;
 
         public:
-            DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+            DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Thu Sep  6 13:27:33 2007
@@ -22,7 +22,6 @@
  *
  */
 
-#include "AccumulatedAck.h"
 #include "Consumer.h"
 #include "Deliverable.h"
 #include "DeliveryAdapter.h"
@@ -35,6 +34,7 @@
 #include "TxBuffer.h"
 #include "SemanticHandler.h"  // FIXME aconway 2007-08-31: remove
 #include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AccumulatedAck.h"
 #include "qpid/shared_ptr.h"
 
 #include <boost/ptr_container/ptr_vector.hpp>
@@ -116,7 +116,7 @@
     TxBuffer::shared_ptr txBuffer;
     DtxBuffer::shared_ptr dtxBuffer;
     bool dtxSelected;
-    AccumulatedAck accumulatedAck;
+    framing::AccumulatedAck accumulatedAck;
     bool opened;
     bool flowActive;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp Thu Sep  6 13:27:33 2007
@@ -25,6 +25,7 @@
 using std::bind2nd;
 using std::mem_fun_ref;
 using namespace qpid::broker;
+using qpid::framing::AccumulatedAck;
 
 TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : 
     acked(_acked), unacked(_unacked){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h Thu Sep  6 13:27:33 2007
@@ -24,7 +24,7 @@
 #include <algorithm>
 #include <functional>
 #include <list>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
 #include "DeliveryRecord.h"
 #include "TxOp.h"
 
@@ -35,7 +35,7 @@
          * transactional channel.
          */
         class TxAck : public TxOp{
-            AccumulatedAck& acked;
+            framing::AccumulatedAck& acked;
             std::list<DeliveryRecord>& unacked;
 
         public:
@@ -44,7 +44,7 @@
              * acks received
              * @param unacked the record of delivered messages
              */
-            TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+            TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Thu Sep  6 13:27:33 2007
@@ -56,7 +56,7 @@
 
 Channel::Channel(bool _transactional, u_int16_t _prefetch) :
     prefetch(_prefetch), transactional(_transactional), running(false), 
-    uniqueId(true)/*could eventually be the session id*/, nameCounter(0)
+    uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false)
 {
 }
 
@@ -65,26 +65,25 @@
     join();
 }
 
-void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
+void Channel::open(const Session& s)
 {
+    Mutex::ScopedLock l(lock);
     if (isOpen())
         THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
-
-    connection = c;
-    sessionCore = s;
-    session = auto_ptr<Session>(new Session(c, s));
+    active = true;
+    session = s;
 }
     
 bool Channel::isOpen() const { 
     Mutex::ScopedLock l(lock);
-    return connection; 
+    return active; 
 }
 
 void Channel::setQos() {
-    session->basicQos(0, getPrefetch(), false);
+    session.basicQos(0, getPrefetch(), false);
     if(isTransactional()) {
         //I think this is wrong! should only send TxSelect once...
-        session->txSelect();
+        session.txSelect();
     }
 }
 
@@ -95,13 +94,13 @@
 
 void Channel::declareExchange(Exchange& exchange, bool synch){
     FieldTable args;
-    ScopedSync s(*session, synch);
-    session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
+    ScopedSync s(session, synch);
+    session.exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
 }
 
 void Channel::deleteExchange(Exchange& exchange, bool synch){
-    ScopedSync s(*session, synch);
-    session->exchangeDelete(0, exchange.getName(), false);
+    ScopedSync s(session, synch);
+    session.exchangeDelete(0, exchange.getName(), false);
 }
 
 void Channel::declareQueue(Queue& queue, bool synch){
@@ -112,30 +111,30 @@
     }
 
     FieldTable args;
-    ScopedSync s(*session, synch);
-    session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
+    ScopedSync s(session, synch);
+    session.queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
                           queue.isExclusive(), queue.isAutoDelete(), args);
     
 }
 
 void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
-    ScopedSync s(*session, synch);
-    session->queueDelete(0, queue.getName(), ifunused, ifempty);
+    ScopedSync s(session, synch);
+    session.queueDelete(0, queue.getName(), ifunused, ifempty);
 }
 
 void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
     string e = exchange.getName();
     string q = queue.getName();
-    ScopedSync s(*session, synch);
-    session->queueBind(0, q, e, key, args);
+    ScopedSync s(session, synch);
+    session.queueBind(0, q, e, key, args);
 }
 
 void Channel::commit(){
-    session->txCommit();
+    session.txCommit();
 }
 
 void Channel::rollback(){
-    session->txRollback();
+    session.txRollback();
 }
 
 void Channel::consume(
@@ -155,8 +154,8 @@
         c.ackMode = ackMode;
         c.lastDeliveryTag = 0;
     }
-    ScopedSync s(*session, synch);
-    session->basicConsume(0, queue.getName(), tag, noLocal,
+    ScopedSync s(session, synch);
+    session.basicConsume(0, queue.getName(), tag, noLocal,
                           ackMode == NO_ACK, false, !synch,
                           fields ? *fields : FieldTable());
 }
@@ -171,13 +170,13 @@
         c = i->second;
         consumers.erase(i);
     }
-    ScopedSync s(*session, synch);
-    session->basicCancel(tag);
+    ScopedSync s(session, synch);
+    session.basicCancel(tag);
 }
 
 bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
-    Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK);
-    sessionCore->flush();//TODO: need to expose the ability to request completion info through session
+    Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK);
+    session.execution().sendFlushRequest();
     if (response.isA<BasicGetEmptyBody>()) {
         return false;
     } else {
@@ -194,19 +193,15 @@
     const string e = exchange.getName();
     string key = routingKey;
 
-    session->basicPublish(0, e, key, mandatory, immediate, msg);
+    session.basicPublish(0, e, key, mandatory, immediate, msg);
 }
 
 void Channel::close()
 {
-    session->close();
+    session.close();
     {
         Mutex::ScopedLock l(lock);
-        if (connection);
-        {
-            sessionCore.reset();
-            connection.reset();
-        }
+        active = false;
     }
     stop();
 }
@@ -232,7 +227,7 @@
 void Channel::run() {
     try {
         while (true) {
-            FrameSet::shared_ptr content = session->get();
+            FrameSet::shared_ptr content = session.get();
             //need to dispatch this to the relevant listener:
             if (content->isA<BasicDeliverBody>()) {
                 ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Thu Sep  6 13:27:33 2007
@@ -79,18 +79,17 @@
     bool running;
 
     ConsumerMap consumers;
-    ConnectionImpl::shared_ptr connection;
-    std::auto_ptr<Session> session;
-    SessionCore::shared_ptr sessionCore;
+    Session session;
     framing::ChannelId channelId;
     BlockingQueue<framing::FrameSet::shared_ptr> gets;
     framing::Uuid uniqueId;
     uint32_t nameCounter;
+    bool active;
 
     void stop();
 
     void setQos();
-    void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
+    void open(const Session& session);
     void closeInternal();
     void join();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp Thu Sep  6 13:27:33 2007
@@ -25,6 +25,7 @@
 #include "Connection.h"
 #include "ClientChannel.h"
 #include "ClientMessage.h"
+#include "ScopedAssociation.h"
 #include "qpid/log/Logger.h"
 #include "qpid/log/Options.h"
 #include "qpid/log/Statement.h"
@@ -66,18 +67,15 @@
 }
 
 void Connection::openChannel(Channel& channel) {
-    ChannelId id = ++channelIdCounter;
-    SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
-    impl->allocated(session);
-    channel.open(impl, session);
-    session->open();
+    channel.open(newSession());
 }
 
 Session Connection::newSession() {
     ChannelId id = ++channelIdCounter;
     SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
-    impl->allocated(session);
-    return Session(impl, session);
+    ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl));
+    session->open();
+    return Session(assoc);
 }
 
 void Connection::close()

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h Thu Sep  6 13:27:33 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Completion_
+#define _Completion_
+
+#include <boost/shared_ptr.hpp>
+#include "Future.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class Completion
+{
+protected:
+    Future future;
+    SessionCore::shared_ptr session;
+
+public:
+    Completion(Future f, SessionCore::shared_ptr s) : future(f), session(s) {}
+
+    void sync()
+    {
+        future.sync(*session);
+    }
+
+    bool isComplete() {
+        return future.isComplete();
+    }
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp Thu Sep  6 13:27:33 2007
@@ -20,45 +20,101 @@
  */
 
 #include "CompletionTracker.h"
+#include <algorithm>
 
 using qpid::client::CompletionTracker;
 using namespace qpid::framing;
 using namespace boost;
 
+namespace 
+{
+const std::string empty;
+}
+
 CompletionTracker::CompletionTracker() {}
 CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
 
+void CompletionTracker::close()
+{   
+    sys::Mutex::ScopedLock l(lock);
+    while (!listeners.empty()) {
+        Record r(listeners.front());
+        {
+            sys::Mutex::ScopedUnlock u(lock);
+            r.completed();
+        }
+        listeners.pop_front();
+    }
+}
 
 void CompletionTracker::completed(const SequenceNumber& _mark)
 {   
     sys::Mutex::ScopedLock l(lock);
     mark = _mark;
-    while (!listeners.empty() && !(listeners.front().first > mark)) {
-        Listener f(listeners.front().second);
+    while (!listeners.empty() && !(listeners.front().id > mark)) {
+        Record r(listeners.front());
         {
             sys::Mutex::ScopedUnlock u(lock);
-            f();
+            r.completed();
         }
-        listeners.pop();
+        listeners.pop_front();
+    }
+}
+
+void CompletionTracker::received(const SequenceNumber& id, const std::string& result)
+{
+    sys::Mutex::ScopedLock l(lock);
+    Listeners::iterator i = seek(id);
+    if (i != listeners.end() && i->id == id) {
+        i->received(result);
+        listeners.erase(i);
     }
 }
 
-void CompletionTracker::listen(const SequenceNumber& point, Listener listener)
+void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener)
 {
-    if (!add(point, listener)) {
+    if (!add(Record(point, listener))) {
         listener();
     }
 }
 
-bool CompletionTracker::add(const SequenceNumber& point, Listener listener)
+void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener)
+{
+    if (!add(Record(point, listener))) {
+        listener(empty);
+    }
+}
+
+bool CompletionTracker::add(const Record& record)
 {
     sys::Mutex::ScopedLock l(lock);
-    if (point < mark) {
+    if (record.id < mark) {
         return false;
     } else {
-        listeners.push(make_pair(point, listener));
+        //insert at the correct position
+        Listeners::iterator i = seek(record.id);
+        if (i == listeners.end()) i = listeners.begin();
+        listeners.insert(i, record);
+
         return true;
     }
 }
 
+CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point)
+{
+    Listeners::iterator i = listeners.begin(); 
+    while (i != listeners.end() && i->id < point) i++;
+    return i;
+}
+
 
+void CompletionTracker::Record::completed() 
+{ 
+    if (f)     f(); 
+    else if(g) g(empty);//won't get a result if command is now complete
+}
+
+void CompletionTracker::Record::received(const std::string& result) 
+{ 
+    if (g) g(result);
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Thu Sep  6 13:27:33 2007
@@ -19,7 +19,7 @@
  *
  */
 
-#include <queue>
+#include <list>
 #include <boost/function.hpp>
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/framing/SequenceNumber.h"
@@ -34,19 +34,40 @@
 class CompletionTracker
 {
 public:
-    typedef boost::function<void()> Listener;    
+    //typedef boost::function<void()> CompletionListener;    
+    typedef boost::function0<void> CompletionListener;    
+    typedef boost::function<void(const std::string&)> ResultListener;
 
     CompletionTracker();
     CompletionTracker(const framing::SequenceNumber& mark);
     void completed(const framing::SequenceNumber& mark);
-    void listen(const framing::SequenceNumber& point, Listener l);
+    void received(const framing::SequenceNumber& id, const std::string& result);
+    void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l);
+    void listenForResult(const framing::SequenceNumber& point, ResultListener l);
+    void close();
 
 private:
+    struct Record 
+    {
+        framing::SequenceNumber id; 
+        CompletionListener f;
+        ResultListener g;        
+
+        Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {}
+        Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {}
+        void completed();
+        void received(const std::string& result);
+
+    };
+
+    typedef std::list<Record> Listeners;
+
     sys::Mutex lock;
     framing::SequenceNumber mark;
-    std::queue< std::pair<framing::SequenceNumber, Listener> > listeners;
+    Listeners listeners;
 
-    bool add(const framing::SequenceNumber& point, Listener l);
+    bool add(const Record& r);
+    Listeners::iterator seek(const framing::SequenceNumber&);
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Sep  6 13:27:33 2007
@@ -60,11 +60,11 @@
 void ConnectionImpl::incoming(framing::AMQFrame& frame)
 {
     uint16_t id = frame.getChannel();
-    SessionCore::shared_ptr session = sessions[id];
-    if (!session) {
+    SessionMap::iterator i = sessions.find(id);
+    if (i == sessions.end()) {
         throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
     }
-    session->handle(frame);
+    i->second->handle(frame);
 }
 
 void ConnectionImpl::open(const std::string& host, int port,
@@ -111,7 +111,8 @@
     connector->send(frame);
 }
 
-void ConnectionImpl::shutdown() {
+void ConnectionImpl::shutdown() 
+{
     //this indicates that the socket to the server has closed
     for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
         i->second->closed(0, "Unexpected socket closure.");

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Thu Sep  6 13:27:33 2007
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Execution_
+#define _Execution_
+
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace client {
+
+class Execution 
+{
+public:
+    virtual ~Execution() {}
+    virtual void sendSyncRequest() = 0;
+    virtual void sendFlushRequest() = 0;
+    virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Thu Sep  6 13:27:33 2007
@@ -97,8 +97,7 @@
 
 void ExecutionHandler::flush()
 {
-    //send completion
-    incoming.lwm = incoming.hwm;
+    sendCompletion();
 }
 
 void ExecutionHandler::noop()
@@ -106,48 +105,88 @@
     //do nothing
 }
 
-void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void ExecutionHandler::result(uint32_t command, const std::string& data)
 {
-    //TODO: need to signal the result to the appropriate listener
+    completion.received(command, data);
 }
 
 void ExecutionHandler::sync()
 {
-    //TODO: implement (the application is in charge of completion of
-    //some commands, so need to track completion for them).
+    //TODO: implement - need to note the mark requested and then
+    //remember to send a response when that point is reached
+}
 
-    //This shouldn't ever need to be called by the server (in my
-    //opinion) as the server never needs to synchronise with the
-    //clients execution
+void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
+{
+    if (point > outgoing.lwm) {
+        sendFlushRequest();
+    }        
 }
 
-void ExecutionHandler::sendFlush()
+void ExecutionHandler::sendFlushRequest()
 {
     AMQFrame frame(0, ExecutionFlushBody());
-    out(frame);        
+    out(frame);
 }
 
-void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
 {
-    //allocate id:
-    ++outgoing.hwm;
-    //register listeners if necessary:
-    if (f) {
-        completion.listen(outgoing.hwm, f);
-    }
-    if (g) {
-        correlation.listen(g);
+    if (point > outgoing.lwm) {
+        sendSyncRequest();
+    }        
+}
+
+
+void ExecutionHandler::sendSyncRequest()
+{
+    AMQFrame frame(0, ExecutionSyncBody());
+    out(frame);
+}
+
+void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
+{
+    if (id > completionStatus.mark) {
+        if (cumulative) {
+            completionStatus.update(completionStatus.mark, id);
+        } else {
+            completionStatus.update(id, id);            
+        }
     }
+    if (send) {
+        sendCompletion();
+    }    
+}
+
+
+void ExecutionHandler::sendCompletion()
+{
+    SequenceNumberSet range;
+    completionStatus.collectRanges(range);
+    AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range));
+    out(frame);    
+}
 
-    AMQFrame frame(0/*id will be filled in be channel handler*/, command);
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
+{
+    SequenceNumber id = ++outgoing.hwm;
+    if(l) {
+        completion.listenForResult(id, l);
+    }
+    AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
     out(frame);
+    return id;
 }
 
-void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data, 
-                                   CompletionTracker::Listener f, Correlator::Listener g)
+SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, 
+                                      CompletionTracker::ResultListener l)
 {
-    send(command, f, g);
+    SequenceNumber id = send(command, l);
+    sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+    return id;
+}
 
+void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+{
     AMQHeaderBody header;
     BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
     header.get<BasicHeaderProperties>(true)->setContentLength(data.size());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Thu Sep  6 13:27:33 2007
@@ -22,28 +22,34 @@
 #define _ExecutionHandler_
 
 #include <queue>
+#include "qpid/framing/AccumulatedAck.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MethodContent.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "BlockingQueue.h"
 #include "ChainableFrameHandler.h"
 #include "CompletionTracker.h"
 #include "Correlator.h"
+#include "Execution.h"
 
 namespace qpid {
 namespace client {
 
 class ExecutionHandler : 
     private framing::AMQP_ServerOperations::ExecutionHandler,
-    public ChainableFrameHandler
+    public ChainableFrameHandler,
+    public Execution
 {
     framing::Window incoming;
     framing::Window outgoing;
     framing::FrameSet::shared_ptr arriving;
     Correlator correlation;
     CompletionTracker completion;
+    BlockingQueue<framing::FrameSet::shared_ptr> received; 
     framing::ProtocolVersion version;
     uint64_t maxFrameSize;
+    framing::AccumulatedAck completionStatus;
 
     void complete(uint32_t mark, const framing::SequenceNumberSet& range);    
     void flush();
@@ -51,22 +57,29 @@
     void result(uint32_t command, const std::string& data);
     void sync();
 
+    void sendCompletion();
+
+    void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data);
+
 public:
-    BlockingQueue<framing::FrameSet::shared_ptr> received; 
+    typedef CompletionTracker::ResultListener ResultListener;
 
     ExecutionHandler(uint64_t maxFrameSize = 65536);
 
-    void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
-
     void handle(framing::AMQFrame& frame);
-    void send(const framing::AMQBody& command, 
-              CompletionTracker::Listener f = CompletionTracker::Listener(), 
-              Correlator::Listener g = Correlator::Listener());
-    void sendContent(const framing::AMQBody& command, 
-                     const framing::BasicHeaderProperties& headers, const std::string& data, 
-                     CompletionTracker::Listener f = CompletionTracker::Listener(), 
-                     Correlator::Listener g = Correlator::Listener());
-    void sendFlush();
+    framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
+    framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, 
+                                 ResultListener=ResultListener());
+    void sendSyncRequest();
+    void sendFlushRequest();
+    void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
+    void syncTo(const framing::SequenceNumber& point);
+    void flushTo(const framing::SequenceNumber& point);
+
+    void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
+    Correlator& getCorrelator() { return correlation; }
+    CompletionTracker& getCompletionTracker() { return completion; }
+    BlockingQueue<framing::FrameSet::shared_ptr>& getReceived() { return received; }
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h Thu Sep  6 13:27:33 2007
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Future_
+#define _Future_
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include "qpid/Exception.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/StructHelper.h"
+#include "FutureCompletion.h"
+#include "FutureResponse.h"
+#include "FutureResult.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class Future : private framing::StructHelper
+{
+    framing::SequenceNumber command;
+    boost::shared_ptr<FutureResponse> response;
+    boost::shared_ptr<FutureResult> result;
+    bool complete;
+
+public:
+    Future() : complete(false) {}    
+    Future(const framing::SequenceNumber& id) : command(id), complete(false) {}    
+
+    void sync(SessionCore& session)
+    {
+        if (!complete) {
+            FutureCompletion callback;
+            session.getExecution().flushTo(command);
+            session.getExecution().getCompletionTracker().listenForCompletion(
+                command,                                                     
+                boost::bind(&FutureCompletion::completed, &callback)
+            );
+            callback.waitForCompletion();
+            session.checkClosed();
+            complete = true;
+        }
+    }
+
+    framing::AMQMethodBody* getResponse(SessionCore& session) 
+    {
+        if (response) {
+            session.getExecution().getCompletionTracker().listenForCompletion(
+                command,                                                     
+                boost::bind(&FutureResponse::completed, response)
+            );            
+            return response->getResponse(session);
+        } else {
+            throw Exception("Response not expected");
+        }
+    }
+
+    template <class T> void decodeResult(T& value, SessionCore& session) 
+    {
+        if (result) {
+            decode(value, result->getResult(session));
+        } else {
+            throw Exception("Result not expected");
+        }
+    }
+
+    bool isComplete() {
+        return complete;
+    }
+
+    void setCommandId(const framing::SequenceNumber& id) { command = id; }
+    void setFutureResponse(boost::shared_ptr<FutureResponse> r) { response = r; }
+    void setFutureResult(boost::shared_ptr<FutureResult> r) { result = r; }
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp Thu Sep  6 13:27:33 2007
@@ -24,9 +24,9 @@
 using namespace qpid::client;
 using namespace qpid::sys;
 
-FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {}
+FutureCompletion::FutureCompletion() : complete(false) {}
 
-bool FutureCompletion::isComplete()
+bool FutureCompletion::isComplete() const
 {
     Monitor::ScopedLock l(lock);
     return complete;
@@ -39,23 +39,10 @@
     lock.notifyAll();
 }
 
-void FutureCompletion::waitForCompletion()
+void FutureCompletion::waitForCompletion() const
 {
     Monitor::ScopedLock l(lock);
-    while (!complete && !closed) {
+    while (!complete) {
         lock.wait();
     }
-    if (closed) {
-        throw ChannelException(code, text);
-    }
-}
-
-void FutureCompletion::close(uint16_t _code, const std::string& _text)
-{
-    Monitor::ScopedLock l(lock);
-    complete = true;
-    closed = true;
-    code = _code;
-    text = _text;        
-    lock.notifyAll();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h Thu Sep  6 13:27:33 2007
@@ -31,19 +31,15 @@
 class FutureCompletion 
 {
 protected:
-    sys::Monitor lock;
+    mutable sys::Monitor lock;
     bool complete;
-    bool closed;
-    uint16_t code;
-    std::string text;
 
 public:
     FutureCompletion();
     virtual ~FutureCompletion(){}
-    bool isComplete();
-    void waitForCompletion();
+    bool isComplete() const;
+    void waitForCompletion() const;
     void completed();
-    void close(uint16_t code, const std::string& text);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp Thu Sep  6 13:27:33 2007
@@ -21,14 +21,17 @@
 
 #include "FutureResponse.h"
 
+#include "SessionCore.h"
+
 using namespace qpid::client;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
 
-AMQMethodBody* FutureResponse::getResponse()
+AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
 {
     waitForCompletion();
+    session.checkClosed();            
     return response.get();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h Thu Sep  6 13:27:33 2007
@@ -29,11 +29,13 @@
 namespace qpid {
 namespace client {
 
+class SessionCore;
+
 class FutureResponse : public FutureCompletion 
 {
     framing::MethodHolder response;
 public:
-    framing::AMQMethodBody* getResponse();
+    framing::AMQMethodBody* getResponse(SessionCore& session);
     void received(framing::AMQMethodBody* response);
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp Thu Sep  6 13:27:33 2007
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureResult.h"
+
+#include "SessionCore.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+const std::string& FutureResult::getResult(SessionCore& session) const
+{
+    waitForCompletion();
+    session.checkClosed();            
+    return result;
+}
+
+void FutureResult::received(const std::string& r)
+{
+    Monitor::ScopedLock l(lock);
+    result = r;
+    complete = true;
+    lock.notifyAll();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h Thu Sep  6 13:27:33 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureResult_
+#define _FutureResult_
+
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "FutureCompletion.h"
+
+namespace qpid {
+namespace client {
+
+class SessionCore;
+
+class FutureResult : public FutureCompletion 
+{
+    std::string result;
+public:
+    const std::string& getResult(SessionCore& session) const;
+    void received(const std::string& result);
+};
+
+}}
+
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h Thu Sep  6 13:27:33 2007
@@ -24,33 +24,26 @@
 
 #include <boost/shared_ptr.hpp>
 #include "qpid/framing/amqp_framing.h"
-#include "FutureResponse.h"
+#include "Completion.h"
 
 namespace qpid {
 namespace client {
 
-class Response
+class Response : public Completion
 {
-    boost::shared_ptr<FutureResponse> future;
-
 public:
-    Response(boost::shared_ptr<FutureResponse> f) : future(f) {}
+    Response(Future f, SessionCore::shared_ptr s) : Completion(f, s) {}
 
     template <class T> T& as() 
     {
-        framing::AMQMethodBody* response(future->getResponse());
-        assert(response);
+        framing::AMQMethodBody* response(future.getResponse(*session));
         return *boost::polymorphic_downcast<T*>(response);
     }
+
     template <class T> bool isA() 
     {
-        framing::AMQMethodBody* response(future->getResponse());
+        framing::AMQMethodBody* response(future.getResponse(*session));
         return response && response->isA<T>();
-    }
-    
-    void sync()
-    {
-        return future->waitForCompletion();
     }
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h Thu Sep  6 13:27:33 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ScopedAssociation_
+#define _ScopedAssociation_
+
+#include "ConnectionImpl.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+struct ScopedAssociation 
+{
+    typedef boost::shared_ptr<ScopedAssociation> shared_ptr;
+
+    SessionCore::shared_ptr session;
+    ConnectionImpl::shared_ptr connection;    
+
+    ScopedAssociation() {}
+
+    ScopedAssociation(SessionCore::shared_ptr s, ConnectionImpl::shared_ptr c) : session(s), connection(c) 
+    {
+        connection->allocated(session);
+    }
+
+    ~ScopedAssociation() 
+    { 
+        if (connection && session) connection->released(session); 
+    }
+};
+
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ScopedAssociation.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Thu Sep  6 13:27:33 2007
@@ -21,12 +21,15 @@
 
 #include "SessionCore.h"
 #include <boost/bind.hpp>
+#include "Future.h"
+#include "FutureResponse.h"
+#include "FutureResult.h"
 
 using namespace qpid::client;
 using namespace qpid::framing;
 
 SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out, 
-                         uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false)
+                         uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false)
 {
     l2.out = boost::bind(&FrameHandler::handle, out, _1);
     l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
@@ -39,47 +42,15 @@
     l2.open(id);
 }
 
-void SessionCore::flush()
+ExecutionHandler& SessionCore::getExecution()
 {
-    l3.sendFlush();
-}
-
-Response SessionCore::send(const AMQMethodBody& method, bool expectResponse)
-{
-    boost::shared_ptr<FutureResponse> f(futures.createResponse());
-    if (expectResponse) {
-        l3.send(method, boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));    
-    } else {
-        l3.send(method, boost::bind(&FutureResponse::completed, f));    
-    }
-    if (sync) {
-        flush();
-        f->waitForCompletion();
-    }
-    return Response(f);
-}
-
-Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse)
-{
-    //TODO: lots of duplication between these two send methods; refactor
-    boost::shared_ptr<FutureResponse> f(futures.createResponse());
-    if (expectResponse) {
-        l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(), 
-                       boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));    
-    } else {
-        l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(), 
-                       boost::bind(&FutureResponse::completed, f));    
-    }
-    if (sync) {
-        flush();
-        f->waitForCompletion();
-    }
-    return Response(f);
+    checkClosed();
+    return l3;
 }
 
 FrameSet::shared_ptr SessionCore::get()
 {
-    return l3.received.pop();
+    return l3.getReceived().pop();
 }
 
 void SessionCore::setSync(bool s)
@@ -95,12 +66,13 @@
 void SessionCore::close()
 {
     l2.close();
-    l3.received.close();
+    stop();
 }
 
 void SessionCore::stop()
 {
-    l3.received.close();
+    l3.getReceived().close();
+    l3.getCompletionTracker().close();
 }
 
 void SessionCore::handle(AMQFrame& frame)
@@ -110,6 +82,46 @@
 
 void SessionCore::closed(uint16_t code, const std::string& text)
 {
-    l3.received.close();
-    futures.close(code, text);
+    stop();
+
+    isClosed = true;
+    reason.code = code;
+    reason.text = text;
+}
+
+void SessionCore::checkClosed()
+{
+    if (isClosed) {
+        throw ChannelException(reason.code, reason.text);
+    }
+}
+
+Future SessionCore::send(const AMQBody& command)
+{ 
+    Future f;
+    //any result/response listeners must be set before the command is sent
+    if (command.getMethod()->resultExpected()) {
+        boost::shared_ptr<FutureResult> r(new FutureResult());
+        f.setFutureResult(r);
+        //result listener is tied to command id, and is set when that
+        //is allocated by the execution handler, so pass it to send
+        f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1)));
+    } else {
+        if (command.getMethod()->responseExpected()) {
+            boost::shared_ptr<FutureResponse> r(new FutureResponse());
+            f.setFutureResponse(r);
+            l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1));
+        }
+
+        f.setCommandId(l3.send(command));
+    }
+    return f;
+}
+
+Future SessionCore::send(const AMQBody& command, const MethodContent& content)
+{
+    //content bearing methods don't currently have responses or
+    //results, if that changes should follow procedure for the other
+    //send method impl:
+    return Future(l3.send(command, content));
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Thu Sep  6 13:27:33 2007
@@ -22,6 +22,7 @@
 #ifndef _SessionCore_
 #define _SessionCore_
 
+#include <boost/function.hpp>
 #include <boost/shared_ptr.hpp>
 #include "qpid/framing/AMQMethodBody.h"
 #include "qpid/framing/FrameHandler.h"
@@ -29,35 +30,44 @@
 #include "qpid/framing/MethodContent.h"
 #include "ChannelHandler.h"
 #include "ExecutionHandler.h"
-#include "FutureFactory.h"
-#include "Response.h"
 
 namespace qpid {
 namespace client {
 
+class Future;
+
 class SessionCore : public framing::FrameHandler
 {
+    struct Reason
+    {
+        uint16_t code;
+        std::string text;
+    };
+
     ExecutionHandler l3;
     ChannelHandler l2;
-    FutureFactory futures;
     const uint16_t id;
     bool sync;
+    bool isClosed;
+    Reason reason;
     
 public:    
     typedef boost::shared_ptr<SessionCore> shared_ptr;
 
     SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
-    Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
-    Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
     framing::FrameSet::shared_ptr get();
     uint16_t getId() const { return id; } 
     void setSync(bool);
     bool isSync();
-    void flush();
     void open();
     void close();
     void stop();
     void closed(uint16_t code, const std::string& text);
+    void checkClosed();
+    ExecutionHandler& getExecution();
+
+    Future send(const framing::AMQBody& command);
+    Future send(const framing::AMQBody& command, const framing::MethodContent& content);
     
     //for incoming frames:
     void handle(framing::AMQFrame& frame);    

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h Thu Sep  6 13:27:33 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _TypedResult_
+#define _TypedResult_
+
+#include "Completion.h"
+
+namespace qpid {
+namespace client {
+
+template <class T> class TypedResult : public Completion
+{
+    T result;
+    bool decoded;
+
+public:
+    TypedResult(Future f, SessionCore::shared_ptr s) : Completion(f, s), decoded(false) {}
+
+    T& get() 
+    {
+        if (!decoded) {
+            future.decodeResult(result, *session);
+            decoded = true;
+        }
+        
+        return result;
+    }
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/TypedResult.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h Thu Sep  6 13:27:33 2007
@@ -50,7 +50,9 @@
     virtual MethodId amqpMethodId() const = 0;
     virtual ClassId  amqpClassId() const = 0;
     virtual bool isContentBearing() const = 0;
-    
+    virtual bool resultExpected() const = 0;    
+    virtual bool responseExpected() const = 0;    
+
     void invoke(AMQP_ServerOperations&);
     bool invoke(Invocable*);
 

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp (from r572247, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp&r1=572247&r2=573359&rev=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp Thu Sep  6 13:27:33 2007
@@ -26,9 +26,9 @@
 using std::list;
 using std::max;
 using std::min;
-using namespace qpid::broker;
+using namespace qpid::framing;
 
-void AccumulatedAck::update(DeliveryId first, DeliveryId last){
+void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){
     assert(first <= last);
     if (last < mark) return;
 
@@ -84,7 +84,7 @@
     ranges.clear();
 }
 
-bool AccumulatedAck::covers(DeliveryId tag) const{
+bool AccumulatedAck::covers(SequenceNumber tag) const{
     if (tag <= mark) return true;
     for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
         if (i->contains(tag)) return true;
@@ -92,7 +92,15 @@
     return false;
 }
 
-bool Range::contains(DeliveryId i) const 
+void AccumulatedAck::collectRanges(SequenceNumberSet& set) const
+{
+    for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+        set.push_back(i->start);
+        set.push_back(i->end);
+    }
+}
+
+bool Range::contains(SequenceNumber i) const 
 { 
     return i >= start && i <= end; 
 }
@@ -113,7 +121,7 @@
     }
 }
 
-bool Range::mergeable(const DeliveryId& s) const
+bool Range::mergeable(const SequenceNumber& s) const
 { 
     if (contains(s) || start - s == 1) {
         return true;
@@ -122,11 +130,11 @@
     }
 }
 
-Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {}
+Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {}
 
 
 namespace qpid{
-namespace broker{
+namespace framing{
     std::ostream& operator<<(std::ostream& out, const Range& r)
     {
         out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]";

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h (from r572394, incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h&r1=572394&r2=573359&rev=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h Thu Sep  6 13:27:33 2007
@@ -25,21 +25,22 @@
 #include <functional>
 #include <list>
 #include <ostream>
-#include "DeliveryId.h"
+#include "SequenceNumber.h"
+#include "SequenceNumberSet.h"
 
 namespace qpid {
-    namespace broker {
+    namespace framing {
 
         struct Range
         {
-            DeliveryId start;
-            DeliveryId end;
+            SequenceNumber start;
+            SequenceNumber end;
 
-            Range(DeliveryId s, DeliveryId e);
-            bool contains(DeliveryId i) const;
+            Range(SequenceNumber s, SequenceNumber e);
+            bool contains(SequenceNumber i) const;
             bool intersect(const Range& r) const;
             bool merge(const Range& r);
-            bool mergeable(const DeliveryId& r) const;
+            bool mergeable(const SequenceNumber& r) const;
         };
         /**
          * Keeps an accumulated record of acked messages (by delivery
@@ -50,18 +51,19 @@
             /**
              * Everything up to this value has been acked.
              */
-            DeliveryId mark;
+            SequenceNumber mark;
             /**
              * List of individually acked messages greater than the
              * 'mark'.
              */
             std::list<Range> ranges;
 
-            explicit AccumulatedAck(DeliveryId r) : mark(r) {}
-            void update(DeliveryId firstTag, DeliveryId lastTag);
+            explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {}
+            void update(SequenceNumber firstTag, SequenceNumber lastTag);
             void consolidate();
             void clear();
-            bool covers(DeliveryId tag) const;
+            bool covers(SequenceNumber tag) const;
+            void collectRanges(SequenceNumberSet& set) const;
         };
         std::ostream& operator<<(std::ostream&, const Range&);
         std::ostream& operator<<(std::ostream&, const AccumulatedAck&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h Thu Sep  6 13:27:33 2007
@@ -44,7 +44,7 @@
         rbuffer.getRawData(data, size);        
     }
 
-    template <class T> void decode(T t, std::string& data) {
+    template <class T> void decode(T& t, const std::string& data) {
         char* bytes = static_cast<char*>(::alloca(data.length()));
         Buffer wbuffer(bytes, data.length());
         wbuffer.putRawData(data);        

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp Thu Sep  6 13:27:33 2007
@@ -19,13 +19,13 @@
  * under the License.
  *
  */
-#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
 #include "qpid_test_plugin.h"
 #include <iostream>
 #include <list>
 
 using std::list;
-using namespace qpid::broker;
+using namespace qpid::framing;
 
 class AccumulatedAckTest : public CppUnit::TestCase  
 {
@@ -44,12 +44,12 @@
 public:
     bool covers(const AccumulatedAck& ack, int i)
     {
-        return ack.covers(DeliveryId(i));
+        return ack.covers(SequenceNumber(i));
     }
 
     void update(AccumulatedAck& ack, int start, int end)
     {
-        ack.update(DeliveryId(start), DeliveryId(end));
+        ack.update(SequenceNumber(start), SequenceNumber(end));
     }
 
     void testGeneral()

Added: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=573359&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Thu Sep  6 13:27:33 2007
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+#include "qpid_test_plugin.h"
+#include "InProcessBroker.h"
+#include "qpid/client/Session.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+class ClientSessionTest : public CppUnit::TestCase
+{
+    CPPUNIT_TEST_SUITE(ClientSessionTest);
+    CPPUNIT_TEST(testQueueQuery);;
+    CPPUNIT_TEST_SUITE_END();
+
+    boost::shared_ptr<Connector> broker;
+    Connection connection;
+    Session session;
+
+  public:
+
+    ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker) 
+    {
+        connection.open("");
+        session = connection.newSession();
+    }
+
+    void testQueueQuery() 
+    {
+        std::string name("my-queue");
+        std::string alternate("amq.fanout");
+        session.queueDeclare(0, name, alternate, false, false, true, true, FieldTable());
+        TypedResult<QueueQueryResult> result = session.queueQuery(name);
+        CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
+        CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive());
+        CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange());
+    }
+
+    void testCompletion()
+    {
+        std::string queue("my-queue");
+        std::string dest("my-dest");
+        session.queueDeclare(0, queue, "", false, false, true, true, FieldTable());
+        //subcribe to the queue with confirm_mode = 1
+        session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable());
+        //publish some messages
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ClientSessionTest);

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Thu Sep  6 13:27:33 2007
@@ -101,6 +101,7 @@
         ) : sender(sender_), conversation(conversation_), in(ih) {}
 
         void send(framing::AMQFrame& frame) {
+            //std::cout << (sender == CLIENT ? "C->S: " : "S->C: ") << frame << std::endl;
             conversation.push_back(TaggedFrame(sender, frame));
             in->received(frame);
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=573359&r1=573358&r2=573359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Sep  6 13:27:33 2007
@@ -94,10 +94,11 @@
   TxPublishTest		\
   ValueTest		\
   MessageHandlerTest    \
-  MessageBuilderTest
+  MessageBuilderTest    \
+  ClientSessionTest
 
 #client_unit_tests =	\
-  ClientChannelTest
+  ClientChannelTest     
 
 framing_unit_tests =	\
   FieldTableTest	\