You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/08/23 01:07:55 UTC

svn commit: r1619951 - in /qpid/trunk/qpid/cpp/src/qpid/messaging/amqp: ConnectionContext.cpp ConnectionContext.h

Author: gsim
Date: Fri Aug 22 23:07:54 2014
New Revision: 1619951

URL: http://svn.apache.org/r1619951
Log:
QPID-6021: prevent protons internal output buffer growing too large

Modified:
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1619951&r1=1619950&r2=1619951&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Aug 22 23:07:54 2014
@@ -89,7 +89,8 @@ ConnectionContext::ConnectionContext(con
       readHeader(false),
       haveOutput(false),
       state(DISCONNECTED),
-      codecAdapter(*this)
+      codecAdapter(*this),
+      notifyOnWrite(false)
 {
     // Concatenate all known URLs into a single URL, get rid of duplicate addresses.
     sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ?
@@ -408,6 +409,13 @@ void ConnectionContext::send(boost::shar
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     checkClosed(ssn);
     SenderContext::Delivery* delivery(0);
+    while (pn_transport_pending(engine) > 65536) {
+        QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written...");
+        notifyOnWrite = true;
+        wakeupDriver();
+        wait(ssn, snd);
+        notifyOnWrite = false;
+    }
     while (!snd->send(message, &delivery)) {
         QPID_LOG(debug, "Waiting for capacity...");
         wait(ssn, snd);//wait for capacity
@@ -758,6 +766,7 @@ std::size_t ConnectionContext::encodePla
     if (n > 0) {
         QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
         haveOutput = true;
+        if (notifyOnWrite) lock.notifyAll();
         return n;
     } else if (n == PN_ERR) {
         throw MessagingException(QPID_MSG("Error on output: " << getError()));

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1619951&r1=1619950&r2=1619951&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Fri Aug 22 23:07:54 2014
@@ -154,6 +154,7 @@ class ConnectionContext : public qpid::s
     } state;
     std::auto_ptr<Sasl> sasl;
     CodecAdapter codecAdapter;
+    bool notifyOnWrite;
 
     void check();
     bool checkDisconnected();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org