You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/11/23 17:56:51 UTC
svn commit: r1412961 - in /qpid/trunk/qpid/cpp/src/qpid:
broker/amqp/Outgoing.cpp messaging/amqp/ConnectionContext.cpp
Author: gsim
Date: Fri Nov 23 16:56:50 2012
New Revision: 1412961
URL: http://svn.apache.org/viewvc?rev=1412961&view=rev
Log:
QPID-4460: Replenish credit to cover specified prefetch if it is drained
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1412961&r1=1412960&r2=1412961&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Fri Nov 23 16:56:50 2012
@@ -145,6 +145,7 @@ void Outgoing::detached()
bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
{
Record& r = deliveries[current++];
+ if (current >= deliveries.capacity()) current = 0;
r.cursor = cursor;
r.msg = msg;
pn_delivery(link, r.tag);
@@ -161,7 +162,7 @@ void Outgoing::notify()
bool Outgoing::accept(const qpid::broker::Message&)
{
- return canDeliver();
+ return true;
}
void Outgoing::setSubjectFilter(const std::string& f)
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1412961&r1=1412960&r2=1412961&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Nov 23 16:56:50 2012
@@ -188,6 +188,7 @@ bool ConnectionContext::fetch(boost::sha
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach?
+ wakeupDriver();
}
return true;
} else {
@@ -195,12 +196,24 @@ bool ConnectionContext::fetch(boost::sha
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
- while (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)) {
- QPID_LOG(notice, "Waiting for credit to be drained: " << (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)));
+ while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+ QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
wait();
}
+ if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
+ pn_link_flow(lnk->receiver, lnk->capacity);
+ }
+ }
+ if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (lnk->capacity) {
+ pn_link_flow(lnk->receiver, 1);
+ wakeupDriver();
+ }
+ return true;
+ } else {
+ return false;
}
- return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org