You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/01/31 20:43:41 UTC

svn commit: r1441163 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Consumer.h Queue.cpp

Author: aconway
Date: Thu Jan 31 19:43:40 2013
New Revision: 1441163

URL: http://svn.apache.org/viewvc?rev=1441163&view=rev
Log:
QPID-4555: Fix handling of no-credit consumers in Queue::getNextMessage.

This was discovered while investigating QPID-4555 but could affect any consumer
using bounded credt.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=1441163&r1=1441162&r2=1441163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Thu Jan 31 19:43:40 2013
@@ -86,6 +86,8 @@ class Consumer : public QueueCursor {
      */
     virtual bool isCounted() { return true; }
 
+    QueueCursor getCursor() const { return *this; }
+    void setCursor(const QueueCursor& qc) { static_cast<QueueCursor&>(*this) = qc; }
   protected:
     //framing::SequenceNumber position;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1441163&r1=1441162&r2=1441163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Jan 31 19:43:40 2013
@@ -366,7 +366,8 @@ bool Queue::getNextMessage(Message& m, C
     while (true) {
         //TODO: reduce lock scope
         Mutex::ScopedLock locker(messageLock);
-        Message* msg = messages->next(*c);
+        QueueCursor cursor = c->getCursor(); // Save current position.
+        Message* msg = messages->next(*c);   // Advances c.
         if (msg) {
             if (msg->hasExpired()) {
                 QPID_LOG(debug, "Message expired from queue '" << name << "'");
@@ -405,6 +406,7 @@ bool Queue::getNextMessage(Message& m, C
                 } else {
                     //message(s) are available but consumer hasn't got enough credit
                     QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+                    c->setCursor(cursor); // Restore cursor, will try again with credit
                     if (c->preAcquires()) {
                         //let someone else try
                         listeners.populate(set);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org