You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/05 22:12:55 UTC

svn commit: r711698 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/client/ tests/

Author: gsim
Date: Wed Nov  5 13:12:54 2008
New Revision: 711698

URL: http://svn.apache.org/viewvc?rev=711698&view=rev
Log:
Added ability to release messages through the Subscription class (+test)
Added another mode for managing completion (+test)
Fixed regression where bytes credit was not reallocated in windowing mode after an accept/release
Fixed regression where subscribe request is issued before listener is registered with dispatcher


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Nov  5 13:12:54 2008
@@ -43,7 +43,8 @@
                                                   cancelled(false),
                                                   completed(false),
                                                   ended(accepted),
-                                                  windowing(_windowing)
+                                                  windowing(_windowing),
+                                                  credit(msg.payload ? msg.payload->getRequiredCredit() : 0)
 {}
 
 void DeliveryRecord::setEnded()
@@ -153,7 +154,7 @@
 
 uint32_t DeliveryRecord::getCredit() const
 {
-    return msg.payload ? msg.payload->getRequiredCredit() : 0;
+    return credit;
 }
 
 void DeliveryRecord::acquire(DeliveryIds& results) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Wed Nov  5 13:12:54 2008
@@ -64,6 +64,15 @@
     bool ended;
     const bool windowing;
 
+    /**
+     * Record required credit on construction as the pointer to the
+     * message may be reset once we no longer need to deliver it
+     * (e.g. when it is accepted), but we will still need to be able
+     * to reallocate credit when it is completed (which could happen
+     * after that).
+     */
+    const uint32_t credit;
+
   public:
     DeliveryRecord(
         const QueuedMessage& msg,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Wed Nov  5 13:12:54 2008
@@ -48,12 +48,16 @@
     if (exceeded) {
         if (!policyExceeded) {
             policyExceeded = true;
-            QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+            if (m.queue) {
+                QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+            }
         }
     } else {
         if (policyExceeded) {
             policyExceeded = false;
-            QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+            if (m.queue) {
+                QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+            }
         }
     }
     return !exceeded;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp Wed Nov  5 13:12:54 2008
@@ -49,6 +49,11 @@
     impl->send(b).wait(*impl);
 }
 
+void SessionBase_0_10::markCompleted(const framing::SequenceSet& ids, bool notifyPeer)
+{
+    impl->markCompleted(ids, notifyPeer);
+}
+
 void SessionBase_0_10::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
 {
     impl->markCompleted(id, cumulative, notifyPeer);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h Wed Nov  5 13:12:54 2008
@@ -104,6 +104,7 @@
 
     Execution& getExecution();  
     void flush();
+    void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
     void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
     void sendCompletion();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Wed Nov  5 13:12:54 2008
@@ -217,6 +217,16 @@
 
 };
 
+void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer)
+{
+    Lock l(state);
+    incompleteIn.remove(ids);
+    completedIn.add(ids);
+    if (notifyPeer) {
+        sendCompletion();
+    }    
+}
+
 void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer)
 {
     Lock l(state);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Wed Nov  5 13:12:54 2008
@@ -89,6 +89,7 @@
 
     Demux& getDemux();
     void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+    void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
     bool isComplete(const framing::SequenceNumber& id);
     bool isCompleteUpTo(const framing::SequenceNumber& id);
     void waitForCompletion(const framing::SequenceNumber& id);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp Wed Nov  5 13:12:54 2008
@@ -38,6 +38,7 @@
 SequenceSet Subscription::getUnaccepted() const { return impl->getUnaccepted(); }
 void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messageIds); }
 void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); }
+void Subscription::release(const SequenceSet& messageIds) { impl->release(messageIds); }
 Session Subscription::getSession() const { return impl->getSession(); }
 SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
 void Subscription::cancel() { impl->cancel(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h Wed Nov  5 13:12:54 2008
@@ -79,12 +79,20 @@
      */
     void accept(const SequenceSet& messageIds);
 
+    /** Release messageIds and remove them from the unaccepted set.
+     *@pre messageIds is a subset of getUnaccepted()
+     */
+    void release(const SequenceSet& messageIds);
+
     /* Acquire a single message */
     void acquire(const Message& m) { acquire(SequenceSet(m.getId())); }
 
     /* Accept a single message */
     void accept(const Message& m) { accept(SequenceSet(m.getId())); }
 
+    /* Release a single message */
+    void release(const Message& m) { release(SequenceSet(m.getId())); }
+
     /** Get the session associated with this subscription */
     Session getSession() const;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Wed Nov  5 13:12:54 2008
@@ -31,6 +31,9 @@
 
 SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
     : manager(m), name(n), queue(q), settings(s), listener(l)
+{}
+
+void SubscriptionImpl::subscribe()
 {
     async(manager.getSession()).messageSubscribe( 
         arg::queue=queue,
@@ -79,11 +82,25 @@
     Mutex::ScopedLock l(lock);
     manager.getSession().messageAccept(messageIds);
     unaccepted.remove(messageIds);
-    if (settings.autoComplete) {
+    switch (settings.completionMode) {
+      case COMPLETE_ON_ACCEPT:
+        manager.getSession().markCompleted(messageIds, true);
+        break;
+      case COMPLETE_ON_DELIVERY:
         manager.getSession().sendCompletion();
+        break;
+      default://do nothing
+        break;
     }
 }
 
+void SubscriptionImpl::release(const SequenceSet& messageIds) {
+    Mutex::ScopedLock l(lock);
+    manager.getSession().messageRelease(messageIds);
+    if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
+        unaccepted.remove(messageIds);
+}
+
 Session SubscriptionImpl::getSession() const { return manager.getSession(); }
 
 SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; }
@@ -102,16 +119,23 @@
         listener->received(m);
     }
 
-    if (settings.autoComplete) {
+    if (settings.completionMode == COMPLETE_ON_DELIVERY) {
         manager.getSession().markCompleted(m.getId(), false, false);
     }
     if (settings.autoAck) {
         if (unaccepted.size() >= settings.autoAck) {
             async(manager.getSession()).messageAccept(unaccepted);
-            unaccepted.clear();
-            if (settings.autoComplete) {
+            switch (settings.completionMode) {
+              case COMPLETE_ON_ACCEPT:
+                manager.getSession().markCompleted(unaccepted, true);
+                break;
+              case COMPLETE_ON_DELIVERY:
                 manager.getSession().sendCompletion();
+                break;
+              default://do nothing
+                break;
             }
+            unaccepted.clear();
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h Wed Nov  5 13:12:54 2008
@@ -70,15 +70,21 @@
     /** Acquire messageIds and remove them from the un-acquired set for the session. */
     void acquire(const SequenceSet& messageIds);
 
-    /** Accept messageIds and remove them from the un-acceptd set for the session. */
+    /** Accept messageIds and remove them from the un-accepted set for the session. */
     void accept(const SequenceSet& messageIds);
 
+    /** Release messageIds and remove them from the un-accepted set for the session. */
+    void release(const SequenceSet& messageIds);
+
     /** Get the session associated with this subscription */
     Session getSession() const;
 
     /** Get the subscription manager associated with this subscription */
     SubscriptionManager& getSubscriptionManager() const;
 
+    /** Send subscription request and issue appropriate flow control commands. */
+    void subscribe();
+
     /** Cancel the subscription. */
     void cancel();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Wed Nov  5 13:12:54 2008
@@ -44,6 +44,8 @@
     std::string name=n.empty() ? q:n;
     boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
     dispatcher.listen(si);
+    //issue subscription request after listener is registered with dispatcher
+    si->subscribe();
     return subscriptions[name] = Subscription(si.get());
 }
 
@@ -53,6 +55,7 @@
     std::string name=n.empty() ? q:n;
     lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
     boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
+    si->subscribe();
     lq.subscription = Subscription(si.get());
     return subscriptions[name] = lq.subscription;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionSettings.h Wed Nov  5 13:12:54 2008
@@ -30,6 +30,11 @@
 /** Bring AMQP enum definitions for message class into this namespace. */
 using namespace qpid::framing::message;
 
+enum CompletionMode {
+    MANUAL_COMPLETION = 0,
+    COMPLETE_ON_DELIVERY = 1,
+    COMPLETE_ON_ACCEPT = 2
+};
 /**
  * Settings for a subscription.
  */
@@ -40,8 +45,8 @@
         AcceptMode accept=ACCEPT_MODE_EXPLICIT,
         AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
         unsigned int autoAck_=1,
-        bool autoComplete_=true
-    ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), autoComplete(autoComplete_) {}
+        CompletionMode completion=COMPLETE_ON_DELIVERY
+    ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), completionMode(completion) {}
                          
     FlowControl flowControl;    ///@< Flow control settings. @see FlowControl
     AcceptMode acceptMode;      ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
@@ -53,19 +58,28 @@
      *  ACCEPT_MODE_NODE.*/
     unsigned int autoAck;
     /**
-     * If set to true, messages will be marked as completed (in
-     * windowing mode, completion of a message will cause the credit
-     * used up by that message to be reallocated) once they have been
-     * received. The server will be explicitly notified of all
-     * completed messages when the next accept is sent through the
+     * In windowing mode, completion of a message will cause the
+     * credit used up by that message to be reallocated. The
+     * subscriptions completion mode controls how completion is
+     * managed.
+     * 
+     * If set to COMPLETE_ON_DELIVERY (which is the default), messages
+     * will be marked as completed once they have been received. The
+     * server will be explicitly notified of all completed messages
+     * for the session when the next accept is sent through the
      * subscription (either explictly or through autAck). However the
      * server may also periodically request information on the
      * completed messages.
      * 
-     * If set to false the application is responsible for completing
-     * messages (@see Session::markCompleted()).
+     * If set to COMPLETE_ON_ACCEPT, messages will be marked as
+     * completed once they are accepted (via the Subscription class)
+     * and the server will also be notified of all completed messages
+     * for the session.
+     * 
+     * If set to MANUAL_COMPLETION the application is responsible for
+     * completing messages (@see Session::markCompleted()).
      */
-    bool autoComplete;
+    CompletionMode completionMode;
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=711698&r1=711697&r2=711698&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Wed Nov  5 13:12:54 2008
@@ -296,6 +296,90 @@
     }
 }
 
+QPID_AUTO_TEST_CASE(testRelease) {
+    ClientSessionFixture fix;
+
+    const uint count=10;
+    for (uint i = 0; i < count; i++) {        
+        Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");        
+        fix.session.messageTransfer(arg::content=m);
+    }
+
+    fix.subs.setAutoStop(false);
+    sys::Thread runner(fix.subs);//start dispatcher thread
+    SubscriptionSettings settings;
+    settings.autoAck = 0;
+
+    SimpleListener l1;
+    Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings);
+    l1.waitFor(count);
+    s1.cancel();
+
+    for (uint i = 0; i < count; i++) {
+        BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData());
+    }
+    s1.release(s1.getUnaccepted());
+
+    //check that released messages are redelivered
+    settings.autoAck = 1;
+    SimpleListener l2;
+    Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings);
+    l2.waitFor(count);
+    for (uint i = 0; i < count; i++) {
+        BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData());
+    }
+    
+    fix.subs.stop();
+    runner.join();
+    fix.session.close();
+}
+
+QPID_AUTO_TEST_CASE(testCompleteOnAccept) {
+    ClientSessionFixture fix;
+
+    fix.session.queueDeclare(arg::queue="HELP_FIND_ME");
+
+    const uint count = 8;
+    const uint chunk = 4;
+    for (uint i = 0; i < count; i++) {        
+        Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");        
+        fix.session.messageTransfer(arg::content=m);
+    }
+
+    SubscriptionSettings settings;
+    settings.autoAck = 0;
+    settings.completionMode = COMPLETE_ON_ACCEPT;
+    settings.flowControl = FlowControl::messageWindow(chunk);
+
+    LocalQueue q;
+    Subscription s = fix.subs.subscribe(q, "my-queue", settings);
+    fix.session.messageFlush(arg::destination=s.getName());
+    SequenceSet accepted;
+    for (uint i = 0; i < chunk; i++) {        
+        Message m;
+        BOOST_CHECK(q.get(m));
+        BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+        accepted.add(m.getId());
+    }    
+    Message m;
+    BOOST_CHECK(!q.get(m));
+    
+    s.accept(accepted);
+    fix.session.messageFlush(arg::destination=s.getName());
+    accepted.clear();
+    
+    for (uint i = chunk; i < count; i++) {        
+        Message m;
+        BOOST_CHECK(q.get(m));
+        BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+        accepted.add(m.getId());
+    }    
+    fix.session.messageAccept(accepted);
+
+    fix.session.queueDelete(arg::queue="HELP_FIND_ME");
+
+}
+
 QPID_AUTO_TEST_SUITE_END()