You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/09/02 18:24:00 UTC

svn commit: r810591 - /qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp

Author: astitcher
Date: Wed Sep  2 16:24:00 2009
New Revision: 810591

URL: http://svn.apache.org/viewvc?rev=810591&view=rev
Log:
Change Async buffer returning logic to only watch reads when necessary

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=810591&r1=810590&r2=810591&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Wed Sep  2 16:24:00 2009
@@ -305,6 +305,13 @@
      * thread processing this handle.
      */
     volatile bool writePending;
+    /**
+     * This records whether we've been reading is flow controlled:
+     * it's safe as a simple boolean as the only way to be stopped
+     * is in calls only allowed in the callback context, the only calls
+     * checking it are also in calls only allowed in callback context.
+     */
+    volatile bool readingStopped;
 };
 
 AsynchIO::AsynchIO(const Socket& s,
@@ -323,7 +330,8 @@
     idleCallback(iCb),
     socket(s),
     queuedClose(false),
-    writePending(false) {
+    writePending(false),
+    readingStopped(false) {
 
     s.setNonblocking();
 }
@@ -351,8 +359,11 @@
     assert(buff);
     buff->dataStart = 0;
     buff->dataCount = 0;
+
+    bool queueWasEmpty = bufferQueue.empty();
     bufferQueue.push_back(buff);
-    DispatchHandle::rewatchRead();
+    if (queueWasEmpty && !readingStopped)
+        DispatchHandle::rewatchRead();
 }
 
 void AsynchIO::unread(BufferBase* buff) {
@@ -361,8 +372,11 @@
         memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
         buff->dataStart = 0;
     }
+
+    bool queueWasEmpty = bufferQueue.empty();
     bufferQueue.push_front(buff);
-    DispatchHandle::rewatchRead();
+    if (queueWasEmpty && !readingStopped)
+        DispatchHandle::rewatchRead();
 }
 
 void AsynchIO::queueWrite(BufferBase* buff) {
@@ -378,6 +392,7 @@
     DispatchHandle::rewatchWrite();
 }
 
+// This can happen outside the callback context
 void AsynchIO::notifyPendingWrite() {
     writePending = true;
     DispatchHandle::rewatchWrite();
@@ -392,11 +407,14 @@
     return writeQueue.empty();
 }
 
+// This can happen outside the callback context
 void AsynchIO::startReading() {
+    readingStopped = false;
     DispatchHandle::rewatchRead();
 }
 
 void AsynchIO::stopReading() {
+    readingStopped = true;
     DispatchHandle::unwatchRead();
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org