You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/05 22:12:55 UTC
svn commit: r711698 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/
qpid/client/ tests/
Author: gsim
Date: Wed Nov 5 13:12:54 2008
New Revision: 711698
URL: http://svn.apache.org/viewvc?rev=711698&view=rev
Log:
Added ability to release messages through the Subscription class (+test)
Added another mode for managing completion (+test)
Fixed regression where bytes credit was not reallocated in windowing mode after an accept/release
Fixed regression where subscribe request is issued before listener is registered with dispatcher
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Nov 5 13:12:54 2008
@@ -43,7 +43,8 @@
cancelled(false),
completed(false),
ended(accepted),
- windowing(_windowing)
+ windowing(_windowing),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : 0)
{}
void DeliveryRecord::setEnded()
@@ -153,7 +154,7 @@
uint32_t DeliveryRecord::getCredit() const
{
- return msg.payload ? msg.payload->getRequiredCredit() : 0;
+ return credit;
}
void DeliveryRecord::acquire(DeliveryIds& results) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Wed Nov 5 13:12:54 2008
@@ -64,6 +64,15 @@
bool ended;
const bool windowing;
+ /**
+ * Record required credit on construction as the pointer to the
+ * message may be reset once we no longer need to deliver it
+ * (e.g. when it is accepted), but we will still need to be able
+ * to reallocate credit when it is completed (which could happen
+ * after that).
+ */
+ const uint32_t credit;
+
public:
DeliveryRecord(
const QueuedMessage& msg,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Wed Nov 5 13:12:54 2008
@@ -48,12 +48,16 @@
if (exceeded) {
if (!policyExceeded) {
policyExceeded = true;
- QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+ if (m.queue) {
+ QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+ }
}
} else {
if (policyExceeded) {
policyExceeded = false;
- QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+ if (m.queue) {
+ QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+ }
}
}
return !exceeded;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp Wed Nov 5 13:12:54 2008
@@ -49,6 +49,11 @@
impl->send(b).wait(*impl);
}
+void SessionBase_0_10::markCompleted(const framing::SequenceSet& ids, bool notifyPeer)
+{
+ impl->markCompleted(ids, notifyPeer);
+}
+
void SessionBase_0_10::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
{
impl->markCompleted(id, cumulative, notifyPeer);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h Wed Nov 5 13:12:54 2008
@@ -104,6 +104,7 @@
Execution& getExecution();
void flush();
+ void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
void sendCompletion();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Wed Nov 5 13:12:54 2008
@@ -217,6 +217,16 @@
};
+void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer)
+{
+ Lock l(state);
+ incompleteIn.remove(ids);
+ completedIn.add(ids);
+ if (notifyPeer) {
+ sendCompletion();
+ }
+}
+
void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer)
{
Lock l(state);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Wed Nov 5 13:12:54 2008
@@ -89,6 +89,7 @@
Demux& getDemux();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
void waitForCompletion(const framing::SequenceNumber& id);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp Wed Nov 5 13:12:54 2008
@@ -38,6 +38,7 @@
SequenceSet Subscription::getUnaccepted() const { return impl->getUnaccepted(); }
void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messageIds); }
void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); }
+void Subscription::release(const SequenceSet& messageIds) { impl->release(messageIds); }
Session Subscription::getSession() const { return impl->getSession(); }
SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
void Subscription::cancel() { impl->cancel(); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h Wed Nov 5 13:12:54 2008
@@ -79,12 +79,20 @@
*/
void accept(const SequenceSet& messageIds);
+ /** Release messageIds and remove them from the unaccepted set.
+ *@pre messageIds is a subset of getUnaccepted()
+ */
+ void release(const SequenceSet& messageIds);
+
/* Acquire a single message */
void acquire(const Message& m) { acquire(SequenceSet(m.getId())); }
/* Accept a single message */
void accept(const Message& m) { accept(SequenceSet(m.getId())); }
+ /* Release a single message */
+ void release(const Message& m) { release(SequenceSet(m.getId())); }
+
/** Get the session associated with this subscription */
Session getSession() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Wed Nov 5 13:12:54 2008
@@ -31,6 +31,9 @@
SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
: manager(m), name(n), queue(q), settings(s), listener(l)
+{}
+
+void SubscriptionImpl::subscribe()
{
async(manager.getSession()).messageSubscribe(
arg::queue=queue,
@@ -79,11 +82,25 @@
Mutex::ScopedLock l(lock);
manager.getSession().messageAccept(messageIds);
unaccepted.remove(messageIds);
- if (settings.autoComplete) {
+ switch (settings.completionMode) {
+ case COMPLETE_ON_ACCEPT:
+ manager.getSession().markCompleted(messageIds, true);
+ break;
+ case COMPLETE_ON_DELIVERY:
manager.getSession().sendCompletion();
+ break;
+ default://do nothing
+ break;
}
}
+void SubscriptionImpl::release(const SequenceSet& messageIds) {
+ Mutex::ScopedLock l(lock);
+ manager.getSession().messageRelease(messageIds);
+ if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
+ unaccepted.remove(messageIds);
+}
+
Session SubscriptionImpl::getSession() const { return manager.getSession(); }
SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; }
@@ -102,16 +119,23 @@
listener->received(m);
}
- if (settings.autoComplete) {
+ if (settings.completionMode == COMPLETE_ON_DELIVERY) {
manager.getSession().markCompleted(m.getId(), false, false);
}
if (settings.autoAck) {
if (unaccepted.size() >= settings.autoAck) {
async(manager.getSession()).messageAccept(unaccepted);
- unaccepted.clear();
- if (settings.autoComplete) {
+ switch (settings.completionMode) {
+ case COMPLETE_ON_ACCEPT:
+ manager.getSession().markCompleted(unaccepted, true);
+ break;
+ case COMPLETE_ON_DELIVERY:
manager.getSession().sendCompletion();
+ break;
+ default://do nothing
+ break;
}
+ unaccepted.clear();
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h Wed Nov 5 13:12:54 2008
@@ -70,15 +70,21 @@
/** Acquire messageIds and remove them from the un-acquired set for the session. */
void acquire(const SequenceSet& messageIds);
- /** Accept messageIds and remove them from the un-acceptd set for the session. */
+ /** Accept messageIds and remove them from the un-accepted set for the session. */
void accept(const SequenceSet& messageIds);
+ /** Release messageIds and remove them from the un-accepted set for the session. */
+ void release(const SequenceSet& messageIds);
+
/** Get the session associated with this subscription */
Session getSession() const;
/** Get the subscription manager associated with this subscription */
SubscriptionManager& getSubscriptionManager() const;
+ /** Send subscription request and issue appropriate flow control commands. */
+ void subscribe();
+
/** Cancel the subscription. */
void cancel();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Wed Nov 5 13:12:54 2008
@@ -44,6 +44,8 @@
std::string name=n.empty() ? q:n;
boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
dispatcher.listen(si);
+ //issue subscription request after listener is registered with dispatcher
+ si->subscribe();
return subscriptions[name] = Subscription(si.get());
}
@@ -53,6 +55,7 @@
std::string name=n.empty() ? q:n;
lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
+ si->subscribe();
lq.subscription = Subscription(si.get());
return subscriptions[name] = lq.subscription;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h Wed Nov 5 13:12:54 2008
@@ -30,6 +30,11 @@
/** Bring AMQP enum definitions for message class into this namespace. */
using namespace qpid::framing::message;
+enum CompletionMode {
+ MANUAL_COMPLETION = 0,
+ COMPLETE_ON_DELIVERY = 1,
+ COMPLETE_ON_ACCEPT = 2
+};
/**
* Settings for a subscription.
*/
@@ -40,8 +45,8 @@
AcceptMode accept=ACCEPT_MODE_EXPLICIT,
AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
unsigned int autoAck_=1,
- bool autoComplete_=true
- ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), autoComplete(autoComplete_) {}
+ CompletionMode completion=COMPLETE_ON_DELIVERY
+ ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), completionMode(completion) {}
FlowControl flowControl; ///@< Flow control settings. @see FlowControl
AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
@@ -53,19 +58,28 @@
* ACCEPT_MODE_NODE.*/
unsigned int autoAck;
/**
- * If set to true, messages will be marked as completed (in
- * windowing mode, completion of a message will cause the credit
- * used up by that message to be reallocated) once they have been
- * received. The server will be explicitly notified of all
- * completed messages when the next accept is sent through the
+ * In windowing mode, completion of a message will cause the
+ * credit used up by that message to be reallocated. The
+ * subscriptions completion mode controls how completion is
+ * managed.
+ *
+ * If set to COMPLETE_ON_DELIVERY (which is the default), messages
+ * will be marked as completed once they have been received. The
+ * server will be explicitly notified of all completed messages
+ * for the session when the next accept is sent through the
* subscription (either explictly or through autAck). However the
* server may also periodically request information on the
* completed messages.
*
- * If set to false the application is responsible for completing
- * messages (@see Session::markCompleted()).
+ * If set to COMPLETE_ON_ACCEPT, messages will be marked as
+ * completed once they are accepted (via the Subscription class)
+ * and the server will also be notified of all completed messages
+ * for the session.
+ *
+ * If set to MANUAL_COMPLETION the application is responsible for
+ * completing messages (@see Session::markCompleted()).
*/
- bool autoComplete;
+ CompletionMode completionMode;
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Wed Nov 5 13:12:54 2008
@@ -296,6 +296,90 @@
}
}
+QPID_AUTO_TEST_CASE(testRelease) {
+ ClientSessionFixture fix;
+
+ const uint count=10;
+ for (uint i = 0; i < count; i++) {
+ Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
+ fix.session.messageTransfer(arg::content=m);
+ }
+
+ fix.subs.setAutoStop(false);
+ sys::Thread runner(fix.subs);//start dispatcher thread
+ SubscriptionSettings settings;
+ settings.autoAck = 0;
+
+ SimpleListener l1;
+ Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings);
+ l1.waitFor(count);
+ s1.cancel();
+
+ for (uint i = 0; i < count; i++) {
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData());
+ }
+ s1.release(s1.getUnaccepted());
+
+ //check that released messages are redelivered
+ settings.autoAck = 1;
+ SimpleListener l2;
+ Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings);
+ l2.waitFor(count);
+ for (uint i = 0; i < count; i++) {
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData());
+ }
+
+ fix.subs.stop();
+ runner.join();
+ fix.session.close();
+}
+
+QPID_AUTO_TEST_CASE(testCompleteOnAccept) {
+ ClientSessionFixture fix;
+
+ fix.session.queueDeclare(arg::queue="HELP_FIND_ME");
+
+ const uint count = 8;
+ const uint chunk = 4;
+ for (uint i = 0; i < count; i++) {
+ Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
+ fix.session.messageTransfer(arg::content=m);
+ }
+
+ SubscriptionSettings settings;
+ settings.autoAck = 0;
+ settings.completionMode = COMPLETE_ON_ACCEPT;
+ settings.flowControl = FlowControl::messageWindow(chunk);
+
+ LocalQueue q;
+ Subscription s = fix.subs.subscribe(q, "my-queue", settings);
+ fix.session.messageFlush(arg::destination=s.getName());
+ SequenceSet accepted;
+ for (uint i = 0; i < chunk; i++) {
+ Message m;
+ BOOST_CHECK(q.get(m));
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+ accepted.add(m.getId());
+ }
+ Message m;
+ BOOST_CHECK(!q.get(m));
+
+ s.accept(accepted);
+ fix.session.messageFlush(arg::destination=s.getName());
+ accepted.clear();
+
+ for (uint i = chunk; i < count; i++) {
+ Message m;
+ BOOST_CHECK(q.get(m));
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+ accepted.add(m.getId());
+ }
+ fix.session.messageAccept(accepted);
+
+ fix.session.queueDelete(arg::queue="HELP_FIND_ME");
+
+}
+
QPID_AUTO_TEST_SUITE_END()