You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/11/28 18:59:44 UTC

svn commit: r1546415 - in /qpid/trunk/qpid/cpp/src/qpid/messaging/amqp: ConnectionContext.cpp ReceiverContext.h

Author: gsim
Date: Thu Nov 28 17:59:44 2013
New Revision: 1546415

URL: http://svn.apache.org/r1546415
Log:
QPID-5378: track outstanding fetches and for receivers with zero capaicty, reissue credit correctly on reconnect

Modified:
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1546415&r1=1546414&r2=1546415&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Nov 28 17:59:44 2013
@@ -140,6 +140,11 @@ void ConnectionContext::close()
 
 bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
 {
+    /**
+     * For fetch() on a receiver with zero capacity, need to reissue the
+     * credit on reconnect, so track the fetches in progress.
+     */
+    qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching);
     {
         qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
         checkClosed(ssn, lnk);
@@ -535,7 +540,11 @@ void ConnectionContext::restartSession(b
     }
     for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) {
         QPID_LOG(debug, id << " reattaching receiver " << i->first);
-        attach(s, i->second->receiver, i->second->capacity);
+        if (i->second->capacity) {
+            attach(s, i->second->receiver, i->second->capacity);
+        } else {
+            attach(s, i->second->receiver, (uint32_t) i->second->fetching);
+        }
         i->second->verify();
         QPID_LOG(debug, id << " receiver " << i->first << " reattached");
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1546415&r1=1546414&r2=1546415&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Thu Nov 28 17:59:44 2013
@@ -24,6 +24,7 @@
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/amqp/AddressHelper.h"
 #include <string>
+#include "qpid/sys/AtomicCount.h"
 #include "qpid/sys/IntegerTypes.h"
 
 struct pn_link_t;
@@ -65,6 +66,7 @@ class ReceiverContext
     AddressHelper helper;
     pn_link_t* receiver;
     uint32_t capacity;
+    qpid::sys::AtomicCount fetching;
     void configure(pn_terminus_t*);
 };
 }}} // namespace qpid::messaging::amqp



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org