You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/25 03:55:07 UTC

svn commit: r707808 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/ qpid/client/ tests/

Author: aconway
Date: Fri Oct 24 18:55:06 2008
New Revision: 707808

URL: http://svn.apache.org/viewvc?rev=707808&view=rev
Log:

Client API change: Centralize access to subscription status, better control of acquire/accept.

client/AckPolicy: removed, functionality moved to Subscription and SubscriptionSettings
client/SubscriptionSettings: struct aggregates flow control & accept-acquire parameters for subscribe.
client/Subscription: represents active subscription. Query settings, unacked messages, manual accept/acquire
client/SubscriptionManager: use AcceptMode, AcquireMode enums rather than confusing bools.

Issues addressed by the change:
 - old use of bool for acceptMode was inverted wrt AMQP enum values, bools are confusing.
 - old AckPolicy was broken - not possible to access the instance associated with an active subscription
 - old AckPolicy did not provide a way to do manual acquire, only accept.
 - setting values on SubscriptionManager to apply to subsequent subscriptions is awkward & error-prone, now can use SubscriptionSettings to control on each subscribe individually.
 - a subscription is a central concept in AMQP, it deserves to be a class. Subscription and SubscriptionSettings provides a single point for future expansion of interactions with a a Subscription.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Handle.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandleAccess.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandlePrivate.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/RangeSet.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/echotest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Oct 24 18:55:06 2008
@@ -368,14 +368,15 @@
 
 libqpidclient_la_SOURCES =			\
   $(rgen_client_srcs)				\
-  qpid/client/AckPolicy.cpp			\
   qpid/client/Bounds.cpp			\
   qpid/client/Connection.cpp			\
   qpid/client/ConnectionHandler.cpp		\
+  qpid/client/ConnectionImpl.h 			\
   qpid/client/ConnectionImpl.cpp                \
   qpid/client/ConnectionSettings.cpp		\
   qpid/client/Connector.cpp	                \
   qpid/client/Demux.cpp				\
+  qpid/client/Dispatcher.h			\
   qpid/client/Dispatcher.cpp			\
   qpid/client/FailoverConnection.cpp            \
   qpid/client/FailoverSession.cpp               \
@@ -385,6 +386,7 @@
   qpid/client/Future.cpp			\
   qpid/client/FutureCompletion.cpp		\
   qpid/client/FutureResult.cpp			\
+  qpid/client/HandlePrivate.h			\
   qpid/client/LoadPlugins.cpp			\
   qpid/client/LocalQueue.cpp			\
   qpid/client/Message.cpp			\
@@ -395,8 +397,12 @@
   qpid/client/SessionBase_0_10.h		\
   qpid/client/SessionBase_0_10Access.h		\
   qpid/client/ConnectionAccess.h		\
+  qpid/client/SessionImpl.h \
   qpid/client/SessionImpl.cpp			\
   qpid/client/StateManager.cpp			\
+  qpid/client/Subscription.cpp			\
+  qpid/client/SubscriptionImpl.h		\
+  qpid/client/SubscriptionImpl.cpp		\
   qpid/client/SubscriptionManager.cpp
 
 nobase_include_HEADERS = \
@@ -500,25 +506,25 @@
   qpid/broker/TxPublish.h \
   qpid/broker/Vhost.h \
   qpid/client/AckMode.h \
-  qpid/client/AckPolicy.h			\
   qpid/client/Bounds.h \
   qpid/client/ChainableFrameHandler.h		\
   qpid/client/Completion.h \
   qpid/client/Connection.h \
   qpid/client/ConnectionHandler.h \
-  qpid/client/ConnectionImpl.h \
   qpid/client/ConnectionSettings.h \
   qpid/client/Connector.h \
   qpid/client/Demux.h \
-  qpid/client/Dispatcher.h \
   qpid/client/Execution.h \
   qpid/client/FailoverConnection.h \
   qpid/client/FailoverSession.h \
+  qpid/client/Subscription.h \
+  qpid/client/SubscriptionSettings.h \
   qpid/client/FailoverSubscriptionManager.h \
   qpid/client/FlowControl.h \
   qpid/client/Future.h \
   qpid/client/FutureCompletion.h \
   qpid/client/FutureResult.h \
+  qpid/client/Handle.h \
   qpid/client/LocalQueue.h \
   qpid/client/QueueOptions.h \
   qpid/client/Message.h \
@@ -527,7 +533,6 @@
   qpid/client/SessionBase_0_10.h \
   qpid/client/Session.h \
   qpid/client/AsyncSession.h \
-  qpid/client/SessionImpl.h \
   qpid/client/StateManager.h \
   qpid/client/SubscriptionManager.h \
   qpid/client/TypedResult.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/RangeSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/RangeSet.h?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/RangeSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/RangeSet.h Fri Oct 24 18:55:06 2008
@@ -27,6 +27,7 @@
 #include <boost/operators.hpp>
 #include <boost/bind.hpp>
 #include <algorithm>
+#include <numeric>
 
 namespace qpid {
 
@@ -53,7 +54,7 @@
 
     void begin(const T& t) { begin_ = t; }
     void end(const T& t) { end_ = t; }
-
+    size_t size() const { return end_ - begin_; }
     bool empty() const { return begin_ == end_; }
 
     bool contains(const T& x) const { return begin_ <= x && x < end_; }
@@ -172,6 +173,7 @@
     // The difference between the start and end of this range set
     uint32_t span() const;
 
+    size_t size() const;
     bool empty() const { return ranges.empty(); }
     void clear() { ranges.clear(); }
     
@@ -185,6 +187,7 @@
     template <class S> void decode(S& s) { uint16_t sz; s(sz); ranges.resize(sz/sizeof(Range<T>)); }
     
  private:
+    static size_t accumulateSize(size_t s, const Range<T>& r) { return s+r.size(); }
     Ranges ranges;
 
   template <class U> friend std::ostream& operator<<(std::ostream& o, const RangeSet<U>& r);
@@ -317,6 +320,9 @@
     return ranges.back().last() - ranges.front().first();
 }
 
+template <class T> size_t RangeSet<T>::size() const {
+    return std::accumulate(rangesBegin(), rangesEnd(), 0, &RangeSet<T>::accumulateSize);
+}
 
 } // namespace qpid
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Fri Oct 24 18:55:06 2008
@@ -19,6 +19,7 @@
  *
  */
 #include "Dispatcher.h"
+#include "SubscriptionImpl.h"
 
 #include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MessageTransferBody.h"
@@ -37,18 +38,6 @@
 namespace qpid {
 namespace client {
 
-Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a)
-    : session(s), listener(l), autoAck(a) {}
-
-void Subscriber::received(Message& msg)
-{
-
-    if (listener) {
-        listener->received(msg);
-        autoAck.ack(msg, session);
-    }
-}
-
 Dispatcher::Dispatcher(const Session& s, const std::string& q)
     : session(s), 
       running(false), 
@@ -78,7 +67,7 @@
             FrameSet::shared_ptr content = queue->pop();
             if (content->isA<MessageTransferBody>()) {
                 Message msg(*content);
-                Subscriber::shared_ptr listener = find(msg.getDestination());
+                boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination());
                 if (!listener) {
                     QPID_LOG(error, "No listener found for destination " << msg.getDestination());
                 } else {
@@ -121,7 +110,7 @@
     autoStop = b;
 }
 
-Subscriber::shared_ptr Dispatcher::find(const std::string& name)
+boost::intrusive_ptr<SubscriptionImpl> Dispatcher::find(const std::string& name)
 {
     ScopedLock<Mutex> l(lock);
     Listeners::iterator i = listeners.find(name);
@@ -131,24 +120,12 @@
     return i->second;
 }
 
-void Dispatcher::listen(
-    MessageListener* listener, AckPolicy autoAck
-)
-{
-    ScopedLock<Mutex> l(lock);
-    defaultListener = Subscriber::shared_ptr(
-        new Subscriber(session, listener, autoAck));
-}
-
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck)
-{
+void Dispatcher::listen(const boost::intrusive_ptr<SubscriptionImpl>& subscription) {
     ScopedLock<Mutex> l(lock);
-    listeners[destination] = Subscriber::shared_ptr(
-        new Subscriber(session, listener, autoAck));
+    listeners[subscription->getName()] = subscription;
 }
 
-void Dispatcher::cancel(const std::string& destination)
-{
+void Dispatcher::cancel(const std::string& destination) {
     ScopedLock<Mutex> l(lock);
     listeners.erase(destination);
     if (autoStop && listeners.empty())

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h Fri Oct 24 18:55:06 2008
@@ -30,24 +30,12 @@
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "MessageListener.h"
-#include "AckPolicy.h"
+#include "SubscriptionImpl.h"
 
 namespace qpid {
 namespace client {
 
-///@internal
-class Subscriber : public MessageListener
-{
-    AsyncSession session;
-    MessageListener* const listener;
-    AckPolicy autoAck;
-
-public:
-    typedef boost::shared_ptr<Subscriber> shared_ptr;
-    Subscriber(const Session& session, MessageListener* listener, AckPolicy);
-    void received(Message& msg);
-    
-};
+class SubscriptionImpl;
 
 ///@internal
 typedef framing::Handler<framing::FrameSet> FrameSetHandler;
@@ -55,7 +43,7 @@
 ///@internal
 class Dispatcher : public sys::Runnable
 {
-    typedef std::map<std::string, Subscriber::shared_ptr> Listeners;
+    typedef std::map<std::string, boost::intrusive_ptr<SubscriptionImpl>  >Listeners;
     sys::Mutex lock;
     sys::Thread worker;
     Session session;
@@ -63,10 +51,10 @@
     bool running;
     bool autoStop;
     Listeners listeners;
-    Subscriber::shared_ptr defaultListener;
+    boost::intrusive_ptr<SubscriptionImpl> defaultListener;
     std::auto_ptr<FrameSetHandler> handler;
 
-    Subscriber::shared_ptr find(const std::string& name);
+    boost::intrusive_ptr<SubscriptionImpl> find(const std::string& name);
     bool isStopped();
 
     boost::function<void ()> failoverHandler;
@@ -84,8 +72,7 @@
       failoverHandler = fh;
     }
 
-    void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy());
-    void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy());
+    void listen(const boost::intrusive_ptr<SubscriptionImpl>& subscription);
     void cancel(const std::string& destination);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Fri Oct 24 18:55:06 2008
@@ -60,7 +60,7 @@
     std::string qname=session.getId().getName();
     session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
     session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
-    subscriptions->subscribe(*this, qname, FlowControl::unlimited());
+    subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
     thread = sys::Thread(*subscriptions);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp Fri Oct 24 18:55:06 2008
@@ -68,7 +68,7 @@
 void 
 FailoverSubscriptionManager::subscribe ( MessageListener   & listener,
                                          const std::string & queue,
-                                         const FlowControl & flow,
+                                         const SubscriptionSettings & settings,
                                          const std::string & tag,
                                          bool                record_this
 )
@@ -77,11 +77,11 @@
 
     subscriptionManager->subscribe ( listener,
                                      queue,
-                                     flow,
+                                     settings,
                                      tag
     );
     if ( record_this )
-      subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&, bool) )  &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag, false ) );
+      subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const SubscriptionSettings&, const std::string&, bool) )  &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, settings, tag, false ) );
 }
 
 
@@ -89,7 +89,7 @@
 void 
 FailoverSubscriptionManager::subscribe ( LocalQueue        & localQueue,
                                          const std::string & queue,
-                                         const FlowControl & flow,
+                                         const SubscriptionSettings & settings,
                                          const std::string & tag,
                                          bool                record_this
 )
@@ -98,12 +98,12 @@
 
     subscriptionManager->subscribe ( localQueue,
                                      queue,
-                                     flow,
+                                     settings,
                                      tag
     );
 
     if ( record_this )
-      subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&, bool) )  &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, false ) );
+      subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const SubscriptionSettings&, const std::string&, bool) )  &FailoverSubscriptionManager::subscribe, this, localQueue, queue, settings, tag, false ) );
 }
 
 
@@ -245,109 +245,4 @@
     lock.notifyAll();
 }
 
-
-
-void 
-FailoverSubscriptionManager::setFlowControl ( const std::string & destination,
-                                              const FlowControl & flow 
-)
-{
-
-    subscriptionManager->setFlowControl ( destination, flow );
-}
-
-
-
-void 
-FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow )
-{
-
-    subscriptionManager->setFlowControl ( flow );
-}
-
-
-
-const FlowControl & 
-FailoverSubscriptionManager::getFlowControl ( ) const
-{
-
-    return subscriptionManager->getFlowControl ( );
-}
-
-
-
-
-void 
-FailoverSubscriptionManager::setFlowControl ( const std::string & tag,
-                                              uint32_t messages,
-                                              uint32_t bytes,
-                                              bool window 
-)
-{
-
-    subscriptionManager->setFlowControl ( tag,
-                                          messages,
-                                          bytes,
-                                          window
-    );
-}
-
-
-
-void 
-FailoverSubscriptionManager::setFlowControl ( uint32_t messages,
-                                              uint32_t bytes,
-                                              bool window
-)
-{
-
-    subscriptionManager->setFlowControl ( messages,
-                                          bytes,
-                                          window
-    );
-}
-
-
-
-void 
-FailoverSubscriptionManager::setAcceptMode ( bool required )
-{
-
-    subscriptionManager->setAcceptMode ( required );
-}
-
-
-
-void 
-FailoverSubscriptionManager::setAcquireMode ( bool acquire )
-{
-
-    subscriptionManager->setAcquireMode ( acquire );
-}
-
-
-
-void 
-FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck )
-{
-
-    subscriptionManager->setAckPolicy ( autoAck );
-}
-
-
-
-AckPolicy & 
-FailoverSubscriptionManager::getAckPolicy()
-{
-
-    return subscriptionManager->getAckPolicy ( );
-}
-
-
-
-
-
-
-
-
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h Fri Oct 24 18:55:06 2008
@@ -31,7 +31,7 @@
 #include <qpid/client/MessageListener.h>
 #include <qpid/client/SubscriptionManager.h>
 #include <qpid/client/LocalQueue.h>
-#include <qpid/client/FlowControl.h>
+#include <qpid/client/SubscriptionSettings.h>
 #include <qpid/sys/Runnable.h>
 #include <qpid/sys/Monitor.h>
 
@@ -50,13 +50,13 @@
 
     void subscribe ( MessageListener   & listener,
                      const std::string & queue,
-                     const FlowControl & flow,
+                     const SubscriptionSettings & ,
                      const std::string & tag = std::string(),
                      bool  record_this = true );
 
     void subscribe ( LocalQueue        & localQueue,
                      const std::string & queue,
-                     const FlowControl & flow,
+                     const SubscriptionSettings & ,
                      const std::string & tag=std::string(),
                      bool  record_this = true );
 
@@ -84,32 +84,6 @@
 
     void stop ( );
 
-    void setFlowControl ( const std::string & destintion, 
-                          const FlowControl & flow );
-
-    void setFlowControl ( const FlowControl & flow );
-
-    const FlowControl & getFlowControl ( ) const;
-
-    void setFlowControl ( const std::string & tag, 
-                          uint32_t messages,  
-                          uint32_t bytes, 
-                          bool window=true );
-
-    void setFlowControl ( uint32_t messages,  
-                          uint32_t bytes, 
-                          bool window = true
-                        );
-
-    void setAcceptMode ( bool required );
-
-    void setAcquireMode ( bool acquire );
-
-    void setAckPolicy ( const AckPolicy & autoAck );
-
-    AckPolicy & getAckPolicy();
-
-
     // Get ready for a failover.
     void prepareForFailover ( Session newSession );
     void failover ( );

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h Fri Oct 24 18:55:06 2008
@@ -22,6 +22,8 @@
  *
  */
 
+#include <qpid/sys/IntegerTypes.h>
+
 namespace qpid {
 namespace client {
 
@@ -40,9 +42,8 @@
  * is renewed.
  *
  * In "window mode" credit is automatically renewed when a message is
- * acknowledged (@see AckPolicy) In non-window mode credit is not
- * automatically renewed, it must be explicitly re-set (@see
- * SubscriptionManager)
+ * accepted. In non-window mode credit is not automatically renewed,
+ * it must be explicitly re-set (@see Subscription)
  */
 struct FlowControl {
     static const uint32_t UNLIMITED=0xFFFFFFFF;

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Handle.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Handle.h?rev=707808&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Handle.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Handle.h Fri Oct 24 18:55:06 2008
@@ -0,0 +1,61 @@
+#ifndef QPID_CLIENT_HANDLE_H
+#define QPID_CLIENT_HANDLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+template <class T> class HandlePrivate;
+
+/**
+ * A handle is like a pointer: it points to some underlying object.
+ * Handles can be null,  like a 0 pointer. Use isValid(), isNull() or the
+ * implicit conversion to bool to test for a null handle.
+ */
+template <class T> class Handle {
+  public:
+    ~Handle();
+    Handle(const Handle&);
+    Handle& operator=(const Handle&);
+
+    /**@return true if handle is valid,  i.e. not null. */
+    bool isValid() const { return impl; }
+
+    /**@return true if handle is null. It is an error to call any function on a null handle. */
+    bool isNull() const { return !impl; }
+
+    operator bool() const { return impl; }
+    bool operator !() const { return impl; }
+
+    void swap(Handle<T>&);
+
+  protected:
+    Handle(T* =0);
+    T* impl;
+
+  friend class HandlePrivate<T>;
+};
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_HANDLE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Handle.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Handle.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandleAccess.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandleAccess.h?rev=707808&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandleAccess.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandleAccess.h Fri Oct 24 18:55:06 2008
@@ -0,0 +1,41 @@
+#ifndef QPID_CLIENT_HANDLEACCESS_H
+#define QPID_CLIENT_HANDLEACCESS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <Handle.h>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Provide access to the private impl member of a Handle.
+ */
+template <class T>
+class HandleAccess
+{
+  public:
+    static boost::shared_ptr<T> getImpl(Handle<T>& h) { return h.impl; }
+};
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_HANDLEACCESS_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandleAccess.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandleAccess.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandlePrivate.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandlePrivate.h?rev=707808&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandlePrivate.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandlePrivate.h Fri Oct 24 18:55:06 2008
@@ -0,0 +1,61 @@
+#ifndef QPID_CLIENT_HANDLEPRIVATE_H
+#define QPID_CLIENT_HANDLEPRIVATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <algorithm>
+
+namespace qpid {
+namespace client {
+
+/** @file
+ * Private implementation of handle, include in .cpp file of handle
+ * subclasses _after_ including the declaration of class T.
+ * T can be any class that can be used with boost::intrusive_ptr.
+ */
+
+template <class T>
+Handle<T>::Handle(T* p) : impl(p) { if (impl) boost::intrusive_ptr_add_ref(impl); }
+
+template <class T>
+Handle<T>::~Handle() { if(impl) boost::intrusive_ptr_release(impl); }
+
+template <class T>
+Handle<T>::Handle(const Handle& h) : impl(h.impl) { if(impl) boost::intrusive_ptr_add_ref(impl); }
+
+template <class T>
+Handle<T>& Handle<T>::operator=(const Handle<T>& h) { Handle<T>(h).swap(*this); return *this; }
+
+template <class T>
+void Handle<T>::swap(Handle<T>& h) { std::swap(impl, h.impl); }
+
+
+/** Access to private impl of a Handle */
+template <class T>
+class HandlePrivate {
+  public:
+    static boost::intrusive_ptr<T> get(Handle<T>& h) { return boost::intrusive_ptr<T>(h.impl); }
+};
+
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_HANDLEPRIVATE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandlePrivate.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/HandlePrivate.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Fri Oct 24 18:55:06 2008
@@ -22,13 +22,15 @@
 #include "qpid/Exception.h"
 #include "qpid/framing/FrameSet.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "HandlePrivate.h"
+#include "SubscriptionImpl.h"
 
 namespace qpid {
 namespace client {
 
 using namespace framing;
 
-LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
+LocalQueue::LocalQueue() {}
 LocalQueue::~LocalQueue() {}
 
 Message LocalQueue::pop() { return get(); }
@@ -48,7 +50,9 @@
     if (!ok) return false;
     if (content->isA<MessageTransferBody>()) {
         result = Message(*content);
-        autoAck.ack(result, session);
+        boost::intrusive_ptr<SubscriptionImpl> si = HandlePrivate<SubscriptionImpl>::get(subscription);
+        assert(si);
+        if (si) si->received(result);
         return true;
     }
     else
@@ -56,9 +60,6 @@
             QPID_MSG("Unexpected method: " << content->getMethod()));
 }
 
-void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
-AckPolicy& LocalQueue::getAckPolicy() { return autoAck; }
-
 bool LocalQueue::empty() const
 { 
     if (!queue)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Fri Oct 24 18:55:06 2008
@@ -23,8 +23,8 @@
  */
 
 #include "qpid/client/Message.h"
+#include "qpid/client/Subscription.h"
 #include "qpid/client/Demux.h"
-#include "qpid/client/AckPolicy.h"
 #include "qpid/sys/Time.h"
 
 namespace qpid {
@@ -38,17 +38,14 @@
  *
  * \ingroup clientapi
  */
-class LocalQueue
-{
+class LocalQueue {
   public:
     /** Create a local queue. Subscribe the local queue to a remote broker
      * queue with a SubscriptionManager.
      *
      * LocalQueue is an alternative to implementing a MessageListener.
-     * 
-     *@param ackPolicy Policy for acknowledging messages. @see AckPolicy.
      */
-    LocalQueue(AckPolicy ackPolicy=AckPolicy());
+    LocalQueue();
 
     ~LocalQueue();
 
@@ -74,16 +71,9 @@
     /** Number of messages on the local queue */
     size_t size() const;
 
-    /** Set the message acknowledgement policy. @see AckPolicy. */
-    void setAckPolicy(AckPolicy);
-
-    /** Get the message acknowledgement policy. @see AckPolicy. */
-    AckPolicy& getAckPolicy();
-
   private:
-    Session session;
     Demux::QueuePtr queue;
-    AckPolicy autoAck;
+    Subscription subscription;
 
   friend class SubscriptionManager;
 };

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp?rev=707808&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp Fri Oct 24 18:55:06 2008
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Subscription.h"
+#include "SubscriptionImpl.h"
+#include "HandlePrivate.h"
+
+namespace qpid {
+namespace client {
+
+template class Handle<SubscriptionImpl>;
+
+
+std::string Subscription::getName() const { return impl->getName(); }
+std::string Subscription::getQueue() const { return impl->getQueue(); }
+const SubscriptionSettings& Subscription::getSettings() const { return impl->getSettings(); }
+void Subscription::setFlowControl(const FlowControl& f) { impl->setFlowControl(f); }
+void Subscription::setAutoAck(size_t n) { impl->setAutoAck(n); }
+SequenceSet Subscription::getUnacquired() const { return impl->getUnacquired(); }
+SequenceSet Subscription::getUnaccepted() const { return impl->getUnaccepted(); }
+void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messageIds); }
+void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); }
+Session Subscription::getSession() const { return impl->getSession(); }
+SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
+void Subscription::cancel() { impl->cancel(); }
+
+}} // namespace qpid::client
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h?rev=707808&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h Fri Oct 24 18:55:06 2008
@@ -0,0 +1,99 @@
+#ifndef QPID_CLIENT_SUBSCRIPTION_H
+#define QPID_CLIENT_SUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/Handle.h"
+#include "qpid/client/Message.h"
+
+namespace qpid {
+namespace client {
+
+class SubscriptionImpl;
+class SubscriptionManager;
+
+/**
+ * A handle to an active subscription. Provides methods to query the subscription status
+ * and control acknowledgement (acquire and accept) of messages.
+ */
+class Subscription : public Handle<SubscriptionImpl> {
+  public:
+    Subscription(SubscriptionImpl* si=0) : Handle<SubscriptionImpl>(si) {}
+    
+    /** The name of the subsctription, used as the "destination" for messages from the broker.
+     * Usually the same as the queue name but can be set differently.
+     */
+    std::string getName() const;
+
+    /** Name of the queue this subscription subscribes to */
+    std::string getQueue() const;
+
+    /** Get the flow control and acknowledgement settings for this subscription */
+    const SubscriptionSettings& getSettings() const;
+
+    /** Set the flow control parameters */
+    void setFlowControl(const FlowControl&);
+
+    /** Automatically acknowledge (acquire and accept) batches of n messages.
+     * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept()
+     * to manually acquire and accept messages.
+     */
+    void setAutoAck(unsigned int n);
+
+    /** Get the set of ID's for messages received by this subscription but not yet acquired.
+     * This will always be empty if getSettings().acquireMode=ACQUIRE_MODE_PRE_ACQUIRED
+     */
+    SequenceSet getUnacquired() const;
+
+    /** Get the set of ID's for messages received by this subscription but not yet accepted. */
+    SequenceSet getUnaccepted() const;
+
+    /** Acquire messageIds and remove them from the unacquired set.
+     * oAdd them to the unaccepted set if getSettings().acceptMode == ACCEPT_MODE_EXPLICIT.
+     */
+    void acquire(const SequenceSet& messageIds);
+
+    /** Accept messageIds and remove them from the unaccepted set.
+     *@pre messageIds is a subset of getUnaccepted()
+     */
+    void accept(const SequenceSet& messageIds);
+
+    /* Acquire a single message */
+    void acquire(const Message& m) { acquire(SequenceSet(m.getId())); }
+
+    /* Accept a single message */
+    void accept(const Message& m) { accept(SequenceSet(m.getId())); }
+
+    /** Get the session associated with this subscription */
+    Session getSession() const;
+
+    /** Get the subscription manager associated with this subscription */
+    SubscriptionManager& getSubscriptionManager() const;
+
+    /** Cancel the subscription. */
+    void cancel();
+};
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_SUBSCRIPTION_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=707808&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Fri Oct 24 18:55:06 2008
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SubscriptionImpl.h"
+#include "SubscriptionManager.h"
+#include "SubscriptionSettings.h"
+
+namespace qpid {
+namespace client {
+
+using sys::Mutex;
+
+SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
+    : manager(m), name(n), queue(q), settings(s), listener(l)
+{
+    async(manager.getSession()).messageSubscribe( 
+        arg::queue=queue,
+        arg::destination=name,
+        arg::acceptMode=settings.acceptMode,
+        arg::acquireMode=settings.acquireMode);
+    setFlowControl(settings.flowControl);
+}
+
+std::string SubscriptionImpl::getName() const { return name; }
+
+std::string SubscriptionImpl::getQueue() const { return queue; }
+
+const SubscriptionSettings& SubscriptionImpl::getSettings() const {
+    Mutex::ScopedLock l(lock);
+    return settings;
+}
+
+void SubscriptionImpl::setFlowControl(const FlowControl& f) {
+    Mutex::ScopedLock l(lock);
+    AsyncSession s=manager.getSession();
+    if (&settings.flowControl != &f) settings.flowControl = f;
+    s.messageSetFlowMode(name, f.window); 
+    s.messageFlow(name, CREDIT_UNIT_MESSAGE, f.messages); 
+    s.messageFlow(name, CREDIT_UNIT_BYTE, f.bytes);
+    s.sync();
+}
+
+void SubscriptionImpl::setAutoAck(size_t n) {
+    Mutex::ScopedLock l(lock);
+    settings.autoAck = n;
+}
+
+SequenceSet SubscriptionImpl::getUnacquired() const { Mutex::ScopedLock l(lock); return unacquired; }
+SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock); return unaccepted; }
+
+void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
+    Mutex::ScopedLock l(lock);
+    manager.getSession().messageAcquire(messageIds);
+    unacquired.remove(messageIds);
+    if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
+        unaccepted.add(messageIds);
+}
+
+void SubscriptionImpl::accept(const SequenceSet& messageIds) {
+    Mutex::ScopedLock l(lock);
+    manager.getSession().messageAccept(messageIds);
+    unaccepted.remove(messageIds);
+}
+
+Session SubscriptionImpl::getSession() const { return manager.getSession(); }
+
+SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; }
+
+void SubscriptionImpl::cancel() { manager.cancel(name); }
+
+void SubscriptionImpl::received(Message& m) {
+    Mutex::ScopedLock l(lock);
+    manager.getSession().markCompleted(m.getId(), false, false);        
+    if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) 
+        unacquired.add(m.getId());
+    else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
+        unaccepted.add(m.getId());
+
+    if (listener) {
+        Mutex::ScopedUnlock u(lock);
+        listener->received(m);
+    }
+
+    if (settings.autoAck) {
+        if (unacquired.size() + unaccepted.size() >= settings.autoAck) {
+            if (unacquired.size()) {
+                async(manager.getSession()).messageAcquire(unacquired);
+                unaccepted.add(unacquired);
+                unaccepted.clear();
+            }
+            async(manager.getSession()).messageAccept(unaccepted);
+            unaccepted.clear();
+        }
+    }
+}
+
+}} // namespace qpid::client
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h?rev=707808&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h Fri Oct 24 18:55:06 2008
@@ -0,0 +1,99 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONIMPL_H
+#define QPID_CLIENT_SUBSCRIPTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace client {
+
+class SubscriptionManager;
+
+class SubscriptionImpl : public RefCounted, public MessageListener {
+  public:
+    SubscriptionImpl(SubscriptionManager&, const std::string& queue,
+                     const SubscriptionSettings&, const std::string& name, MessageListener* =0);
+    
+    /** The name of the subsctription, used as the "destination" for messages from the broker.
+     * Usually the same as the queue name but can be set differently.
+     */
+    std::string getName() const;
+
+    /** Name of the queue this subscription subscribes to */
+    std::string getQueue() const;
+
+    /** Get the flow control and acknowledgement settings for this subscription */
+    const SubscriptionSettings& getSettings() const;
+
+    /** Set the flow control parameters */
+    void setFlowControl(const FlowControl&);
+
+    /** Automatically acknowledge (acquire and accept) batches of n messages.
+     * You can disable auto-acknowledgement by setting n=0, and use acquire() and accept()
+     * to manually acquire and accept messages.
+     */
+    void setAutoAck(size_t n);
+
+    /** Get the set of ID's for messages received by this subscription but not yet acquired.
+     * This will always be empty if  acquireMode=ACQUIRE_MODE_PRE_ACQUIRED
+     */
+    SequenceSet getUnacquired() const;
+
+    /** Get the set of ID's for messages acquired by this subscription but not yet accepted. */
+    SequenceSet getUnaccepted() const;
+
+    /** Acquire messageIds and remove them from the un-acquired set for the session. */
+    void acquire(const SequenceSet& messageIds);
+
+    /** Accept messageIds and remove them from the un-acceptd set for the session. */
+    void accept(const SequenceSet& messageIds);
+
+    /** Get the session associated with this subscription */
+    Session getSession() const;
+
+    /** Get the subscription manager associated with this subscription */
+    SubscriptionManager& getSubscriptionManager() const;
+
+    /** Cancel the subscription. */
+    void cancel();
+
+    void received(Message&);
+    
+  private:
+
+    mutable sys::Mutex lock;
+    SubscriptionManager& manager;
+    std::string name, queue;
+    SubscriptionSettings settings;
+    framing::SequenceSet unacquired, unaccepted;
+    MessageListener* listener;
+};
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_SUBSCRIPTIONIMPL_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Fri Oct 24 18:55:06 2008
@@ -22,6 +22,7 @@
 #define _Subscription_
 
 #include "SubscriptionManager.h"
+#include "SubscriptionImpl.h"
 #include <qpid/client/Dispatcher.h>
 #include <qpid/client/Session.h>
 #include <qpid/client/MessageListener.h>
@@ -34,83 +35,41 @@
 namespace client {
 
 SubscriptionManager::SubscriptionManager(const Session& s)
-    : dispatcher(s), session(s),
-      flowControl(UNLIMITED, UNLIMITED, false),
-      acceptMode(0), acquireMode(0),
-      autoStop(true)
+  : dispatcher(s), session(s), autoStop(true)
 {}
 
-void SubscriptionManager::subscribeInternal(
-    const std::string& q, const std::string& dest, const FlowControl& fc)
+Subscription SubscriptionManager::subscribe(
+    MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
 {
-    session.messageSubscribe( 
-        arg::queue=q, arg::destination=dest,
-        arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
-    if (fc.messages || fc.bytes) // No need to set if all 0.
-        setFlowControl(dest, fc);
+    std::string name=n.empty() ? q:n;
+    boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
+    dispatcher.listen(si);
+    return subscriptions[name] = Subscription(si.get());
 }
 
-void SubscriptionManager::subscribe(
-    MessageListener& listener, const std::string& q, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+    LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
 {
-    subscribe(listener, q, getFlowControl(), d);
+    std::string name=n.empty() ? q:n;
+    lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
+    boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
+    lq.subscription = Subscription(si.get());
+    return subscriptions[name] = lq.subscription;
 }
 
-void SubscriptionManager::subscribe(
-    MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+    MessageListener& listener, const std::string& q, const std::string& n)
 {
-    std::string dest=d.empty() ? q:d;
-    dispatcher.listen(dest, &listener, autoAck);
-    return subscribeInternal(q, dest, fc);
+    return subscribe(listener, q, defaultSettings, n);
 }
 
-void SubscriptionManager::subscribe(
-    LocalQueue& lq, const std::string& q, const std::string& d)
+Subscription SubscriptionManager::subscribe(
+    LocalQueue& lq, const std::string& q, const std::string& n)
 {
-    subscribe(lq, q, getFlowControl(), d);
+    return subscribe(lq, q, defaultSettings, n);
 }
 
-void SubscriptionManager::subscribe(
-    LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
-{
-    std::string dest=d.empty() ? q:d;
-    lq.session=session;
-    lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
-    return subscribeInternal(q, dest, fc);
-}
-
-void SubscriptionManager::setFlowControl(
-    const std::string& dest, uint32_t messages,  uint32_t bytes, bool window)
-{
-    session.messageSetFlowMode(dest, window); 
-    session.messageFlow(dest, 0, messages); 
-    session.messageFlow(dest, 1, bytes);
-    session.sync();
-}
-
-void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
-    setFlowControl(dest, fc.messages, fc.bytes, fc.window);
-}
-
-void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
-
-void SubscriptionManager::setFlowControl(
-    uint32_t messages_,  uint32_t bytes_, bool window_)
-{
-    setFlowControl(FlowControl(messages_, bytes_, window_));
-}
-
-const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
-
-void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
-
-void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
-
-void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
-
-AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; } 
-
-void SubscriptionManager::cancel(const std::string dest)
+void SubscriptionManager::cancel(const std::string& dest)
 {
     sync(session).messageCancel(dest);
     dispatcher.cancel(dest);
@@ -138,10 +97,11 @@
 bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
     LocalQueue lq;
     std::string unique = framing::Uuid(true).str();
-    subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+    subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique);
     AutoCancel ac(*this, unique);
     //first wait for message to be delivered if a timeout has been specified
-    if (timeout && lq.get(result, timeout)) return true;
+    if (timeout && lq.get(result, timeout))
+        return true;
     //make sure message is not on queue before final check
     sync(session).messageFlush(unique);
     return lq.get(result, 0);
@@ -149,6 +109,10 @@
 
 Session SubscriptionManager::getSession() const { return session; }
 
+Subscription SubscriptionManager::getSubscription(const std::string& name) const {
+    return subscriptions.at(name);
+}
+
 void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
     dispatcher.registerFailoverHandler(fh);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Fri Oct 24 18:55:06 2008
@@ -25,9 +25,10 @@
 #include <qpid/client/Dispatcher.h>
 #include <qpid/client/Completion.h>
 #include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
 #include <qpid/client/MessageListener.h>
 #include <qpid/client/LocalQueue.h>
-#include <qpid/client/FlowControl.h>
+#include <qpid/client/Subscription.h>
 #include <qpid/sys/Runnable.h>
 #include <set>
 #include <sstream>
@@ -48,15 +49,10 @@
     typedef sys::Mutex::ScopedLock Lock;
     typedef sys::Mutex::ScopedUnlock Unlock;
 
-    void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&);
-    
     qpid::client::Dispatcher dispatcher;
     qpid::client::AsyncSession session;
-    FlowControl flowControl;
-    AckPolicy autoAck;
-    bool acceptMode;
-    bool acquireMode;
     bool autoStop;
+    SubscriptionSettings defaultSettings;
     
   public:
     /** Create a new SubscriptionManager associated with a session */
@@ -70,14 +66,13 @@
      * 
      *@param listener Listener object to receive messages.
      *@param queue Name of the queue to subscribe to.
-     *@param flow initial FlowControl for the subscription.
-     *@param tag Unique destination tag for the listener.
-     * If not specified, the queue name is used.
+     *@param settings settings for the subscription.
+     *@param name unique destination name for the subscription, defaults to queue name.
      */
-    void subscribe(MessageListener& listener,
-                   const std::string& queue,
-                   const FlowControl& flow,
-                   const std::string& tag=std::string());
+    Subscription subscribe(MessageListener& listener,
+                           const std::string& queue,
+                           const SubscriptionSettings& settings,
+                           const std::string& name=std::string());
 
     /**
      * Subscribe a LocalQueue to receive messages from queue.
@@ -86,13 +81,13 @@
      * 
      *@param queue Name of the queue to subscribe to.
      *@param flow initial FlowControl for the subscription.
-     *@param tag Unique destination tag for the listener.
+     *@param name unique destination name for the subscription, defaults to queue name.
      * If not specified, the queue name is used.
      */
-    void subscribe(LocalQueue& localQueue,
-                   const std::string& queue,
-                   const FlowControl& flow,
-                   const std::string& tag=std::string());
+    Subscription subscribe(LocalQueue& localQueue,
+                           const std::string& queue,
+                           const SubscriptionSettings& settings,
+                           const std::string& name=std::string());
 
     /**
      * Subscribe a MessagesListener to receive messages from queue.
@@ -102,12 +97,12 @@
      * 
      *@param listener Listener object to receive messages.
      *@param queue Name of the queue to subscribe to.
-     *@param tag Unique destination tag for the listener.
+     *@param name unique destination name for the subscription, defaults to queue name.
      * If not specified, the queue name is used.
      */
-    void subscribe(MessageListener& listener,
-                   const std::string& queue,
-                   const std::string& tag=std::string());
+    Subscription subscribe(MessageListener& listener,
+                           const std::string& queue,
+                           const std::string& name=std::string());
 
     /**
      * Subscribe a LocalQueue to receive messages from queue.
@@ -115,12 +110,12 @@
      * Incoming messages are stored in the queue for you to retrieve.
      * 
      *@param queue Name of the queue to subscribe to.
-     *@param tag Unique destination tag for the listener.
+     *@param name unique destination name for the subscription, defaults to queue name.
      * If not specified, the queue name is used.
      */
-    void subscribe(LocalQueue& localQueue,
-                   const std::string& queue,
-                   const std::string& tag=std::string());
+    Subscription subscribe(LocalQueue& localQueue,
+                           const std::string& queue,
+                           const std::string& name=std::string());
 
 
     /** Get a single message from a queue.
@@ -131,8 +126,13 @@
      */
     bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
 
-    /** Cancel a subscription. */
-    void cancel(const std::string tag);
+    /** Get a subscription by name, returns a null Subscription handle
+     * if not found.
+     */
+    Subscription getSubscription(const std::string& name) const;
+    
+    /** Cancel a subscription. See also: Subscription.cancel() */
+    void cancel(const std::string& name);
 
     /** Deliver messages in the current thread until stop() is called.
      * Only one thread may be running in a SubscriptionManager at a time.
@@ -157,53 +157,65 @@
 
     static const uint32_t UNLIMITED=0xFFFFFFFF;
 
-    /** Set the flow control for destination. */
-    void setFlowControl(const std::string& destintion, const FlowControl& flow);
-
-    /** Set the default initial flow control for subscriptions that do not specify it. */
-    void setFlowControl(const FlowControl& flow);
+    /** Set the flow control for a subscription. */
+    void setFlowControl(const std::string& name, const FlowControl& flow) {
+        getSubscription(name).setFlowControl(flow);
+    }
 
-    /** Get the default flow control for new subscriptions that do not specify it. */
-    const FlowControl& getFlowControl() const;
-
-    /** Set the flow control for destination tag.
-     *@param tag: name of the destination.
+    /** Set the flow control for a subscription.
+     *@param name: name of the subscription.
      *@param messages: message credit.
      *@param bytes: byte credit.
      *@param window: if true use window-based flow control.
      */
-    void setFlowControl(const std::string& tag, uint32_t messages,  uint32_t bytes, bool window=true);
+    void setFlowControl(const std::string& name, uint32_t messages,  uint32_t bytes, bool window=true) {
+        setFlowControl(name, messages, bytes, window);
+    }
 
-    /** Set the initial flow control settings to be applied to each new subscribtion.
-     *@param messages: message credit.
-     *@param bytes: byte credit.
-     *@param window: if true use window-based flow control.
+    /** Set the default settings for subscribe() calls that don't
+     * include a SubscriptionSettings parameter.
+     */
+    void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; }
+
+    /** Get the default settings for subscribe() calls that don't
+     * include a SubscriptionSettings parameter.
      */
-    void setFlowControl(uint32_t messages,  uint32_t bytes, bool window=true);
+    const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; }
 
-    /** Set the accept-mode for new subscriptions. Defaults to true.
-     *@param required: if true messages must be confirmed by calling
-     *Message::acknowledge() or automatically via an AckPolicy, see setAckPolicy()
+    /** Get the default settings for subscribe() calls that don't
+     * include a SubscriptionSettings parameter.
      */
-    void setAcceptMode(bool required);
+    SubscriptionSettings& getDefaultSettings() { return defaultSettings; }
 
-    /** Set the acquire-mode for new subscriptions. Defaults to false.
-     *@param acquire: if false messages pre-acquired, if true
-     * messages are dequed on acknowledgement or on transfer 
-     * depending on acceptMode.
+    /**
+     * Set the default flow control settings for subscribe() calls
+     * that don't include a SubscriptionSettings parameter.
+     *
+     *@param messages: message credit.
+     *@param bytes: byte credit.
+     *@param window: if true use window-based flow control.
      */
-    void setAcquireMode(bool acquire);
+    void setFlowControl(uint32_t messages,  uint32_t bytes, bool window=true) {
+        defaultSettings.flowControl = FlowControl(messages, bytes, window);
+    }
 
-    /** Set the acknowledgement policy for new subscriptions.
-     * Default is to acknowledge every message automatically.
+    /**
+     *Set the default accept-mode for subscribe() calls that don't
+     *include a SubscriptionSettings parameter.
      */
-    void setAckPolicy(const AckPolicy& autoAck);
+    void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; }
 
-    AckPolicy& getAckPolicy();
+    /**
+     * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings.
+     */
+    void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; }
 
     void registerFailoverHandler ( boost::function<void ()> fh );
 
     Session getSession() const;
+
+  private:
+    std::map<std::string, Subscription> subscriptions;
 };
 
 /** AutoCancel cancels a subscription in its destructor */

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h?rev=707808&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h Fri Oct 24 18:55:06 2008
@@ -0,0 +1,62 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONSETTINGS_H
+#define QPID_CLIENT_SUBSCRIPTIONSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/FlowControl.h"
+#include "qpid/framing/enum.h"
+
+namespace qpid {
+namespace client {
+
+/** Bring AMQP enum definitions for message class into this namespace. */
+using namespace qpid::framing::message;
+
+/**
+ * Settings for a subscription.
+ */
+struct SubscriptionSettings
+{
+    SubscriptionSettings(
+        FlowControl flow=FlowControl::unlimited(),
+        AcceptMode accept=ACCEPT_MODE_EXPLICIT,
+        AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
+        unsigned int autoAck_=1
+    ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_) {}
+                         
+    FlowControl flowControl;    ///@< Flow control settings. @see FlowControl
+    AcceptMode acceptMode;      ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
+    AcquireMode acquireMode;    ///@< ACQUIRE_MODE_PRE_ACQUIRED or ACQUIRE_MODE_NOT_ACQUIRED
+
+    /** Automatically acknowledge (acquire and accept) batches of autoAck messages.
+     * 0 means no automatic acknowledgement. What it means to "acknowledge" a message depends on
+     * acceptMode and acquireMode:
+     *  - ACCEPT_MODE_NONE and ACQUIRE_MODE_PRE_ACQUIRED: do nothing
+     *  - ACCEPT_MODE_NONE and ACQUIRE_MODE_NOT_ACQUIRED: send an "acquire" command
+     *  - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED: send "accept" command
+     *  - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_NOT_ACQUIRED: send "acquire" and "accept" commands
+     */
+    unsigned int autoAck;
+};
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_SUBSCRIPTIONSETTINGS_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Oct 24 18:55:06 2008
@@ -20,8 +20,7 @@
  */
 #include "unit_test.h"
 #include "BrokerFixture.h"
-#include "qpid/client/AckPolicy.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Runnable.h"
@@ -52,22 +51,22 @@
     std::vector<Message> messages;
     string name;
     uint expected;
-    Dispatcher dispatcher;
+    SubscriptionManager submgr;
 
     DummyListener(Session& session, const string& n, uint ex) :
-        name(n), expected(ex), dispatcher(session) {}
+        name(n), expected(ex), submgr(session) {}
 
     void run()
     {
-        dispatcher.listen(name, this);
-        dispatcher.run();
+        submgr.subscribe(*this, name);
+        submgr.run();
     }
 
     void received(Message& msg)
     {
         messages.push_back(msg);
         if (--expected == 0) {
-            dispatcher.stop();
+            submgr.stop();
         }
     }
 };
@@ -95,53 +94,30 @@
 
 struct ClientSessionFixture : public ProxySessionFixture
 {
-    ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {}
-
-    void declareSubscribe(const string& q="my-queue",
-                          const string& dest="my-dest")
-    {
-        session.queueDeclare(arg::queue=q);
-        session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::acquireMode=1);
-        session.messageFlow(arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);//messages
-        session.messageFlow(arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);//bytes
+    ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {
+        session.queueDeclare(arg::queue="my-queue");
     }
 };
 
 QPID_AUTO_TEST_CASE(testQueueQuery) {
     ClientSessionFixture fix;
     fix.session = fix.connection.newSession();
-    fix.session.queueDeclare(arg::queue="my-queue", arg::alternateExchange="amq.fanout", arg::exclusive=true, arg::autoDelete=true);
-    QueueQueryResult result = fix.session.queueQuery(string("my-queue"));
+    fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout",
+                             arg::exclusive=true, arg::autoDelete=true);
+    QueueQueryResult result = fix.session.queueQuery("q");
     BOOST_CHECK_EQUAL(false, result.getDurable());
     BOOST_CHECK_EQUAL(true, result.getExclusive());
-    BOOST_CHECK_EQUAL(string("amq.fanout"),
-                      result.getAlternateExchange());
-}
-
-QPID_AUTO_TEST_CASE(testTransfer)
-{
-    ClientSessionFixture fix;
-    fix.session=fix.connection.newSession();
-    fix.declareSubscribe();
-    fix.session.messageTransfer(arg::acceptMode=1, arg::content=TransferContent("my-message", "my-queue"));
-    //get & test the message:
-    FrameSet::shared_ptr msg = fix.session.get();
-    BOOST_CHECK(msg->isA<MessageTransferBody>());
-    BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
-    //confirm receipt:
-    AckPolicy autoAck;
-    autoAck.ack(Message(*msg), fix.session);
+    BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange());
 }
 
 QPID_AUTO_TEST_CASE(testDispatcher)
 {
     ClientSessionFixture fix;
     fix.session =fix.connection.newSession();
-    fix.declareSubscribe();
     size_t count = 100;
     for (size_t i = 0; i < count; ++i) 
         fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
-    DummyListener listener(fix.session, "my-dest", count);
+    DummyListener listener(fix.session, "my-queue", count);
     listener.run();
     BOOST_CHECK_EQUAL(count, listener.messages.size());        
     for (size_t i = 0; i < count; ++i) 
@@ -152,9 +128,8 @@
 {
     ClientSessionFixture fix;
     fix.session =fix.connection.newSession();
-    fix.declareSubscribe();
     size_t count = 10;
-    DummyListener listener(fix.session, "my-dest", count);
+    DummyListener listener(fix.session, "my-queue", count);
     sys::Thread t(listener);
     for (size_t i = 0; i < count; ++i) {
         fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
@@ -190,7 +165,6 @@
 {
     ClientSessionFixture fix;
     fix.session.timeout(60);
-    fix.declareSubscribe();
     fix.session.suspend();
     // Make sure we are still subscribed after resume.
     fix.connection.resume(fix.session);
@@ -234,7 +208,7 @@
     BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
     BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
     BOOST_CHECK(lq.empty());    // Credit exhausted.
-    fix.subs.setFlowControl("lq", FlowControl::unlimited());
+    fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited());
     BOOST_CHECK_EQUAL("foo2", lq.pop().getData());    
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Fri Oct 24 18:55:06 2008
@@ -168,8 +168,10 @@
     ProxySessionFixture f;
     std::string q("my-ring-queue");
     f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
-    LocalQueue incoming(AckPolicy(0));//no automatic acknowledgements
-    f.subs.subscribe(incoming, q);
+    LocalQueue incoming;
+    SubscriptionSettings settings(FlowControl::unlimited());
+    settings.autoAck = 0; // no auto ack.
+    Subscription sub = f.subs.subscribe(incoming, q, settings); 
     for (int i = 0; i < 5; i++) {
         f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp Fri Oct 24 18:55:06 2008
@@ -29,7 +29,7 @@
 #include "qpid/framing/TransferContent.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/client/Connection.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
 #include "qpid/client/LocalQueue.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/SubscriptionManager.h"
@@ -54,30 +54,6 @@
 
 Shlib shlib("../.libs/xml.so");
 
-struct DummyListener : public sys::Runnable, public MessageListener {
-    std::vector<Message> messages;
-    string name;
-    uint expected;
-    Dispatcher dispatcher;
-
-    DummyListener(Session& session, const string& n, uint ex) :
-        name(n), expected(ex), dispatcher(session) {}
-
-    void run()
-    {
-        dispatcher.listen(name, this);
-        dispatcher.run();
-    }
-
-    void received(Message& msg)
-    {
-        messages.push_back(msg);
-        if (--expected == 0)
-            dispatcher.stop();
-    }
-};
-
-
 class SubscribedLocalQueue : public LocalQueue {
   private:
     SubscriptionManager& subscriptions;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp Fri Oct 24 18:55:06 2008
@@ -75,11 +75,11 @@
         if (opts.declare)
             session.queueDeclare(opts.queue);
         SubscriptionManager subs(session);
-        LocalQueue lq(AckPolicy(opts.ack));
-        subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
-        subs.setFlowControl(opts.count, SubscriptionManager::UNLIMITED,
-                            false);
-        subs.subscribe(lq, opts.queue);
+        LocalQueue lq;
+        SubscriptionSettings settings;
+        settings.acceptMode = opts.ack > 0 ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
+        settings.flowControl = FlowControl(opts.count, SubscriptionManager::UNLIMITED,false);
+        Subscription sub = subs.subscribe(lq, opts.queue, settings);
         Message msg;
         AbsTime begin=now();        
         for (size_t i = 0; i < opts.count; ++i) {
@@ -87,7 +87,7 @@
             QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
         }
         if (opts.ack != 0)
-            subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+            sub.accept(sub.getUnaccepted()); // Cumulative ack for final batch.
         AbsTime end=now();
         double secs(double(Duration(begin,end))/TIME_SEC);
         if (opts.summary) cout << opts.count/secs << endl;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/echotest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/echotest.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/echotest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/echotest.cpp Fri Oct 24 18:55:06 2008
@@ -92,9 +92,7 @@
 {
     session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true);
     request.getDeliveryProperties().setRoutingKey(queue);
-    subscriptions.setAcceptMode(1/*not-required*/);
-    subscriptions.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
-    subscriptions.subscribe(*this, queue);    
+    subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));    
 
     request.getDeliveryProperties().setTimestamp(current_time());
     if (size) request.setData(std::string(size, 'X'));

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp Fri Oct 24 18:55:06 2008
@@ -204,14 +204,15 @@
         std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl;
         session.queuePurge(arg::queue=queue);
     }
+    SubscriptionSettings settings;
     if (opts.prefetch) {
-        mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2)));
-        mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true);
+        settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2));
+        settings.flowControl = FlowControl::messageWindow(opts.prefetch);
     } else {
-        mgr.setAcceptMode(1/*not-required*/);
-        mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+        settings.acceptMode = ACCEPT_MODE_NONE;
+        settings.flowControl = FlowControl::unlimited();
     }
-    mgr.subscribe(*this, queue);    
+    mgr.subscribe(*this, queue, settings);    
 }
 
 void Receiver::test()

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Fri Oct 24 18:55:06 2008
@@ -560,11 +560,12 @@
         try {            
             if (opts.txSub) sync(session).txSelect();
             SubscriptionManager subs(session);
-            LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack));
-            subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1);
-            subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
-                                false);
-            subs.subscribe(lq, queue);
+            SubscriptionSettings settings;
+            settings.autoAck = opts.txSub ? opts.txSub : opts.ack;
+            settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT);
+            settings.flowControl = FlowControl::messageCredit(opts.subQuota);
+            LocalQueue lq;
+            Subscription subscription = subs.subscribe(lq, queue, settings);
             // Notify controller we are ready.
             session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
             if (opts.txSub) {
@@ -603,7 +604,7 @@
                     }
                 }
                 if (opts.txSub || opts.ack)
-                    lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+                    subscription.accept(subscription.getUnaccepted());
                 if (opts.txSub) {
                     if (opts.commitAsync) session.txCommit();
                     else sync(session).txCommit();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Fri Oct 24 18:55:06 2008
@@ -66,6 +66,7 @@
 public:
     Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
     virtual void received(Message& msg);
+    Subscription subscription;
 };
 
 /**
@@ -118,14 +119,15 @@
             //set up listener
             SubscriptionManager mgr(session);
             Listener listener(session, mgr, "response", args.transactional);
+            SubscriptionSettings settings;
             if (args.prefetch) {
-                mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2)));
-                mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true);
+                settings.autoAck = (args.ack ? args.ack : (args.prefetch / 2));
+                settings.flowControl = FlowControl::messageCredit(args.prefetch);
             } else {
-                mgr.setAcceptMode(1/*-not-required*/);
-                mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+                settings.acceptMode = ACCEPT_MODE_NONE;
+                settings.flowControl = FlowControl::unlimited();
             }
-            mgr.subscribe(listener, control);
+            listener.subscription =  mgr.subscribe(listener, control, settings);
             session.sync();
 
             if( args.statusqueue.length() > 0 ) {
@@ -170,7 +172,7 @@
     if(string("TERMINATION_REQUEST") == type){
         shutdown();
     }else if(string("REPORT_REQUEST") == type){
-        mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point
+        subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point
         cout <<"Batch ended, sending report." << endl;
         //send a report:
         report();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp?rev=707808&r1=707807&r2=707808&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txtest.cpp Fri Oct 24 18:55:06 2008
@@ -139,9 +139,10 @@
             else session.txSelect();
             SubscriptionManager subs(session);
             
-            LocalQueue lq(AckPolicy(0));//manual acking
-            subs.setFlowControl(opts.msgsPerTx, SubscriptionManager::UNLIMITED, true);
-            subs.subscribe(lq, src);
+            LocalQueue lq;
+            SubscriptionSettings settings(FlowControl::messageWindow(opts.msgsPerTx));
+            settings.autoAck = 0; // Disabled
+            Subscription sub = subs.subscribe(lq, src, settings);
             
             for (uint t = 0; t < opts.txCount; t++) {
                 Message in;
@@ -157,7 +158,7 @@
                     out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
                     session.messageTransfer(arg::content=out, arg::acceptMode=1);
                 }
-                lq.getAckPolicy().ackOutstanding(session);
+                sub.accept(sub.getUnaccepted());
                 if (opts.dtx) {
                     session.dtxEnd(arg::xid=xid);
                     session.dtxPrepare(arg::xid=xid);
@@ -230,8 +231,6 @@
     int check() 
     {
         SubscriptionManager subs(session);
-        subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
-        subs.setAcceptMode(1/*not-required*/);
 
         // Recover DTX transactions (if any)
         if (opts.dtx) {
@@ -262,9 +261,10 @@
         StringSet drained;
         //drain each queue and verify the correct set of messages are available
         for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
-            //subscribe, allocate credit and flush
-            LocalQueue lq(AckPolicy(0));//manual acking
-            subs.subscribe(lq, *i, *i);
+            //subscribe, allocate credit and flushn
+            LocalQueue lq;
+            SubscriptionSettings settings(FlowControl::unlimited(), ACCEPT_MODE_NONE);
+            subs.subscribe(lq, *i, settings);
             session.messageFlush(arg::destination=*i);
             session.sync();