You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/08/23 01:07:55 UTC
svn commit: r1619951 - in /qpid/trunk/qpid/cpp/src/qpid/messaging/amqp:
ConnectionContext.cpp ConnectionContext.h
Author: gsim
Date: Fri Aug 22 23:07:54 2014
New Revision: 1619951
URL: http://svn.apache.org/r1619951
Log:
QPID-6021: prevent protons internal output buffer growing too large
Modified:
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1619951&r1=1619950&r2=1619951&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Aug 22 23:07:54 2014
@@ -89,7 +89,8 @@ ConnectionContext::ConnectionContext(con
readHeader(false),
haveOutput(false),
state(DISCONNECTED),
- codecAdapter(*this)
+ codecAdapter(*this),
+ notifyOnWrite(false)
{
// Concatenate all known URLs into a single URL, get rid of duplicate addresses.
sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ?
@@ -408,6 +409,13 @@ void ConnectionContext::send(boost::shar
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
SenderContext::Delivery* delivery(0);
+ while (pn_transport_pending(engine) > 65536) {
+ QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written...");
+ notifyOnWrite = true;
+ wakeupDriver();
+ wait(ssn, snd);
+ notifyOnWrite = false;
+ }
while (!snd->send(message, &delivery)) {
QPID_LOG(debug, "Waiting for capacity...");
wait(ssn, snd);//wait for capacity
@@ -758,6 +766,7 @@ std::size_t ConnectionContext::encodePla
if (n > 0) {
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
+ if (notifyOnWrite) lock.notifyAll();
return n;
} else if (n == PN_ERR) {
throw MessagingException(QPID_MSG("Error on output: " << getError()));
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1619951&r1=1619950&r2=1619951&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Fri Aug 22 23:07:54 2014
@@ -154,6 +154,7 @@ class ConnectionContext : public qpid::s
} state;
std::auto_ptr<Sasl> sasl;
CodecAdapter codecAdapter;
+ bool notifyOnWrite;
void check();
bool checkDisconnected();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org