You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/09/02 18:24:00 UTC
svn commit: r810591 - /qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
Author: astitcher
Date: Wed Sep 2 16:24:00 2009
New Revision: 810591
URL: http://svn.apache.org/viewvc?rev=810591&view=rev
Log:
Change Async buffer returning logic to only watch reads when necessary
Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=810591&r1=810590&r2=810591&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Wed Sep 2 16:24:00 2009
@@ -305,6 +305,13 @@
* thread processing this handle.
*/
volatile bool writePending;
+ /**
+ * This records whether we've been reading is flow controlled:
+ * it's safe as a simple boolean as the only way to be stopped
+ * is in calls only allowed in the callback context, the only calls
+ * checking it are also in calls only allowed in callback context.
+ */
+ volatile bool readingStopped;
};
AsynchIO::AsynchIO(const Socket& s,
@@ -323,7 +330,8 @@
idleCallback(iCb),
socket(s),
queuedClose(false),
- writePending(false) {
+ writePending(false),
+ readingStopped(false) {
s.setNonblocking();
}
@@ -351,8 +359,11 @@
assert(buff);
buff->dataStart = 0;
buff->dataCount = 0;
+
+ bool queueWasEmpty = bufferQueue.empty();
bufferQueue.push_back(buff);
- DispatchHandle::rewatchRead();
+ if (queueWasEmpty && !readingStopped)
+ DispatchHandle::rewatchRead();
}
void AsynchIO::unread(BufferBase* buff) {
@@ -361,8 +372,11 @@
memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
buff->dataStart = 0;
}
+
+ bool queueWasEmpty = bufferQueue.empty();
bufferQueue.push_front(buff);
- DispatchHandle::rewatchRead();
+ if (queueWasEmpty && !readingStopped)
+ DispatchHandle::rewatchRead();
}
void AsynchIO::queueWrite(BufferBase* buff) {
@@ -378,6 +392,7 @@
DispatchHandle::rewatchWrite();
}
+// This can happen outside the callback context
void AsynchIO::notifyPendingWrite() {
writePending = true;
DispatchHandle::rewatchWrite();
@@ -392,11 +407,14 @@
return writeQueue.empty();
}
+// This can happen outside the callback context
void AsynchIO::startReading() {
+ readingStopped = false;
DispatchHandle::rewatchRead();
}
void AsynchIO::stopReading() {
+ readingStopped = true;
DispatchHandle::unwatchRead();
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org