You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/14 14:47:41 UTC
svn commit: r594879 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/client:
SubscriptionManager.cpp SubscriptionManager.h
Author: aconway
Date: Wed Nov 14 05:47:39 2007
New Revision: 594879
URL: http://svn.apache.org/viewvc?rev=594879&view=rev
Log:
Added auto-ack and commit-mode control to SubscriptionManager API.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=594879&r1=594878&r2=594879&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Wed Nov 14 05:47:39 2007
@@ -34,34 +34,42 @@
SubscriptionManager::SubscriptionManager(Session_0_10& s)
: dispatcher(s), session(s),
- messages(UNLIMITED), bytes(UNLIMITED), window(true)
+ messages(UNLIMITED), bytes(UNLIMITED), window(true),
+ confirmMode(true)
{}
+void SubscriptionManager::subscribeInternal(
+ const std::string& q, const std::string& dest)
+{
+ session.messageSubscribe(arg::queue=q, arg::destination=dest,
+ arg::confirmMode=confirmMode);
+ setFlowControl(dest, messages, bytes, window);
+}
+
void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const std::string& t)
+ MessageListener& listener, const std::string& q, const std::string& d)
{
- std::string tag=t.empty() ? q:t;
- dispatcher.listen(tag, &listener);
- session.messageSubscribe(arg::queue=q, arg::destination=tag);
- setFlowControl(tag, messages, bytes, window);
+ std::string dest=d.empty() ? q:d;
+ dispatcher.listen(dest, &listener, autoAck);
+ subscribeInternal(q, dest);
}
void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const std::string& t)
+ LocalQueue& lq, const std::string& q, const std::string& d)
{
- std::string tag=t.empty() ? q:t;
+ std::string dest=d.empty() ? q:d;
lq.session=session;
- lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag));
- session.messageSubscribe(arg::queue=q, arg::destination=tag);
- setFlowControl(tag, messages, bytes, window);
+ lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest));
+ lq.setAckPolicy(autoAck);
+ subscribeInternal(q, dest);
}
void SubscriptionManager::setFlowControl(
- const std::string& tag, uint32_t messages, uint32_t bytes, bool window)
+ const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
{
- session.messageFlowMode(tag, window);
- session.messageFlow(tag, 0, messages);
- session.messageFlow(tag, 1, bytes);
+ session.messageFlowMode(dest, window);
+ session.messageFlow(dest, 0, messages);
+ session.messageFlow(dest, 1, bytes);
}
void SubscriptionManager::setFlowControl(
@@ -72,10 +80,14 @@
window=window_;
}
-void SubscriptionManager::cancel(const std::string tag)
+void SubscriptionManager::setConfirmMode(bool c) { confirmMode=c; }
+
+void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
+
+void SubscriptionManager::cancel(const std::string dest)
{
- dispatcher.cancel(tag);
- session.messageCancel(tag);
+ dispatcher.cancel(dest);
+ session.messageCancel(dest);
}
void SubscriptionManager::run(bool autoStop)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=594879&r1=594878&r2=594879&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Wed Nov 14 05:47:39 2007
@@ -37,11 +37,15 @@
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
+ void subscribeInternal(const std::string& q, const std::string& dest);
+
qpid::client::Dispatcher dispatcher;
qpid::client::Session_0_10& session;
uint32_t messages;
uint32_t bytes;
bool window;
+ AckPolicy autoAck;
+ bool confirmMode;
public:
SubscriptionManager(Session_0_10& session);
@@ -96,6 +100,17 @@
*@param window: if true use window-based flow control.
*/
void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
+
+ /** Set the confirm-mode for new subscriptions. Defaults to true.
+ *@param confirm: if true messages must be confirmed by calling
+ *Message::acknowledge() or automatically, see setAckPolicy()
+ */
+ void setConfirmMode(bool confirm);
+
+ /** Set the acknowledgement policy for new subscriptions.
+ * Default is to acknowledge every message automatically.
+ */
+ void setAckPolicy(const AckPolicy& autoAck);
};