You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/11/28 18:59:44 UTC
svn commit: r1546415 - in /qpid/trunk/qpid/cpp/src/qpid/messaging/amqp:
ConnectionContext.cpp ReceiverContext.h
Author: gsim
Date: Thu Nov 28 17:59:44 2013
New Revision: 1546415
URL: http://svn.apache.org/r1546415
Log:
QPID-5378: track outstanding fetches and for receivers with zero capaicty, reissue credit correctly on reconnect
Modified:
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1546415&r1=1546414&r2=1546415&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Nov 28 17:59:44 2013
@@ -140,6 +140,11 @@ void ConnectionContext::close()
bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
+ /**
+ * For fetch() on a receiver with zero capacity, need to reissue the
+ * credit on reconnect, so track the fetches in progress.
+ */
+ qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching);
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn, lnk);
@@ -535,7 +540,11 @@ void ConnectionContext::restartSession(b
}
for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) {
QPID_LOG(debug, id << " reattaching receiver " << i->first);
- attach(s, i->second->receiver, i->second->capacity);
+ if (i->second->capacity) {
+ attach(s, i->second->receiver, i->second->capacity);
+ } else {
+ attach(s, i->second->receiver, (uint32_t) i->second->fetching);
+ }
i->second->verify();
QPID_LOG(debug, id << " receiver " << i->first << " reattached");
}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1546415&r1=1546414&r2=1546415&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Thu Nov 28 17:59:44 2013
@@ -24,6 +24,7 @@
#include "qpid/messaging/Address.h"
#include "qpid/messaging/amqp/AddressHelper.h"
#include <string>
+#include "qpid/sys/AtomicCount.h"
#include "qpid/sys/IntegerTypes.h"
struct pn_link_t;
@@ -65,6 +66,7 @@ class ReceiverContext
AddressHelper helper;
pn_link_t* receiver;
uint32_t capacity;
+ qpid::sys::AtomicCount fetching;
void configure(pn_terminus_t*);
};
}}} // namespace qpid::messaging::amqp
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org