You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/11/28 15:13:39 UTC

svn commit: r1414708 - in /qpid/branches/0.20/qpid/cpp/src/qpid: broker/amqp/Outgoing.cpp messaging/amqp/ConnectionContext.cpp

Author: gsim
Date: Wed Nov 28 14:13:38 2012
New Revision: 1414708

URL: http://svn.apache.org/viewvc?rev=1414708&view=rev
Log:
QPID-4460: Replenish credit to cover specified prefetch if it is drained

Modified:
    qpid/branches/0.20/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp

Modified: qpid/branches/0.20/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1414708&r1=1414707&r2=1414708&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Nov 28 14:13:38 2012
@@ -145,6 +145,7 @@ void Outgoing::detached()
 bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
 {
     Record& r = deliveries[current++];
+    if (current >= deliveries.capacity()) current = 0;
     r.cursor = cursor;
     r.msg = msg;
     pn_delivery(link, r.tag);
@@ -161,7 +162,7 @@ void Outgoing::notify()
 
 bool Outgoing::accept(const qpid::broker::Message&)
 {
-    return canDeliver();
+    return true;
 }
 
 void Outgoing::setSubjectFilter(const std::string& f)

Modified: qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1414708&r1=1414707&r2=1414708&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/0.20/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Wed Nov 28 14:13:38 2012
@@ -188,6 +188,7 @@ bool ConnectionContext::fetch(boost::sha
         qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
         if (lnk->capacity) {
             pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach?
+            wakeupDriver();
         }
         return true;
     } else {
@@ -195,12 +196,24 @@ bool ConnectionContext::fetch(boost::sha
             qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
             pn_link_drain(lnk->receiver, 0);
             wakeupDriver();
-            while (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)) {
-                QPID_LOG(notice, "Waiting for credit to be drained: " << (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)));
+            while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+                QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
                 wait();
             }
+            if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
+                pn_link_flow(lnk->receiver, lnk->capacity);
+            }
+        }
+        if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
+            qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+            if (lnk->capacity) {
+                pn_link_flow(lnk->receiver, 1);
+                wakeupDriver();
+            }
+            return true;
+        } else {
+            return false;
         }
-        return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE);
     }
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org