You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/14 14:47:41 UTC

svn commit: r594879 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/client: SubscriptionManager.cpp SubscriptionManager.h

Author: aconway
Date: Wed Nov 14 05:47:39 2007
New Revision: 594879

URL: http://svn.apache.org/viewvc?rev=594879&view=rev
Log:
Added auto-ack and commit-mode control to SubscriptionManager API.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=594879&r1=594878&r2=594879&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Wed Nov 14 05:47:39 2007
@@ -34,34 +34,42 @@
 
 SubscriptionManager::SubscriptionManager(Session_0_10& s)
     : dispatcher(s), session(s),
-      messages(UNLIMITED), bytes(UNLIMITED), window(true)
+      messages(UNLIMITED), bytes(UNLIMITED), window(true),
+      confirmMode(true)
 {}
 
+void SubscriptionManager::subscribeInternal(
+    const std::string& q, const std::string& dest)
+{
+    session.messageSubscribe(arg::queue=q, arg::destination=dest,
+                             arg::confirmMode=confirmMode);
+    setFlowControl(dest, messages, bytes, window);
+}
+
 void SubscriptionManager::subscribe(
-    MessageListener& listener, const std::string& q, const std::string& t)
+    MessageListener& listener, const std::string& q, const std::string& d)
 {
-    std::string tag=t.empty() ? q:t;
-    dispatcher.listen(tag, &listener);
-    session.messageSubscribe(arg::queue=q, arg::destination=tag);
-    setFlowControl(tag, messages, bytes, window);
+    std::string dest=d.empty() ? q:d;
+    dispatcher.listen(dest, &listener, autoAck);
+    subscribeInternal(q, dest);
 }
 
 void SubscriptionManager::subscribe(
-    LocalQueue& lq, const std::string& q, const std::string& t)
+    LocalQueue& lq, const std::string& q, const std::string& d)
 {
-    std::string tag=t.empty() ? q:t;
+    std::string dest=d.empty() ? q:d;
     lq.session=session;
-    lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag));
-    session.messageSubscribe(arg::queue=q, arg::destination=tag);
-    setFlowControl(tag, messages, bytes, window);
+    lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest));
+    lq.setAckPolicy(autoAck);
+    subscribeInternal(q, dest);
 }
 
 void SubscriptionManager::setFlowControl(
-    const std::string& tag, uint32_t messages,  uint32_t bytes, bool window)
+    const std::string& dest, uint32_t messages,  uint32_t bytes, bool window)
 {
-    session.messageFlowMode(tag, window); 
-    session.messageFlow(tag, 0, messages); 
-    session.messageFlow(tag, 1, bytes);
+    session.messageFlowMode(dest, window); 
+    session.messageFlow(dest, 0, messages); 
+    session.messageFlow(dest, 1, bytes);
 }
 
 void SubscriptionManager::setFlowControl(
@@ -72,10 +80,14 @@
     window=window_;
 }
 
-void SubscriptionManager::cancel(const std::string tag)
+void SubscriptionManager::setConfirmMode(bool c) { confirmMode=c; }
+
+void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
+
+void SubscriptionManager::cancel(const std::string dest)
 {
-    dispatcher.cancel(tag);
-    session.messageCancel(tag);
+    dispatcher.cancel(dest);
+    session.messageCancel(dest);
 }
 
 void SubscriptionManager::run(bool autoStop)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=594879&r1=594878&r2=594879&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Wed Nov 14 05:47:39 2007
@@ -37,11 +37,15 @@
     typedef sys::Mutex::ScopedLock Lock;
     typedef sys::Mutex::ScopedUnlock Unlock;
 
+    void subscribeInternal(const std::string& q, const std::string& dest);
+    
     qpid::client::Dispatcher dispatcher;
     qpid::client::Session_0_10& session;
     uint32_t messages;
     uint32_t bytes;
     bool window;
+    AckPolicy autoAck;
+    bool confirmMode;
 
 public:
     SubscriptionManager(Session_0_10& session);
@@ -96,6 +100,17 @@
      *@param window: if true use window-based flow control.
      */
     void setFlowControl(uint32_t messages,  uint32_t bytes, bool window=true);
+
+    /** Set the confirm-mode for new subscriptions. Defaults to true.
+     *@param confirm: if true messages must be confirmed by calling
+     *Message::acknowledge() or automatically, see setAckPolicy()
+     */
+    void setConfirmMode(bool confirm);
+
+    /** Set the acknowledgement policy for new subscriptions.
+     * Default is to acknowledge every message automatically.
+     */
+    void setAckPolicy(const AckPolicy& autoAck);
 };