You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/01/13 00:31:51 UTC
svn commit: r733966 - in /qpid/trunk/qpid/cpp/src/qpid/sys/windows:
AsynchIO.cpp AsynchIoResult.h IoHandlePrivate.h IocpPoller.cpp
Author: shuston
Date: Mon Jan 12 15:31:49 2009
New Revision: 733966
URL: http://svn.apache.org/viewvc?rev=733966&view=rev
Log:
Add support for AsynchIO::RequestCallback processing
Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=733966&r1=733965&r2=733966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon Jan 12 15:31:49 2009
@@ -534,9 +534,22 @@
return;
}
-// TODO: This needs to arrange for a callback that is serialised with
-// the other IO callbacks for this AsynchIO
+// Queue the specified callback for invocation from an I/O thread.
void AsynchIO::requestCallback(RequestCallback callback) {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ IOHandlePrivate *hp =
+ new IOHandlePrivate (INVALID_SOCKET,
+ boost::bind(&AsynchIO::completion, this, _1),
+ callback);
+ IOHandle h(hp);
+ PollerHandle ph(h);
+ poller->addFd(ph, Poller::INPUT);
}
/**
@@ -714,7 +727,13 @@
else {
AsynchWriteResult *w =
dynamic_cast<AsynchWriteResult*>(result);
- writeComplete(w);
+ if (w != 0)
+ writeComplete(w);
+ else {
+ AsynchCallbackRequest *req =
+ dynamic_cast<AsynchCallbackRequest*>(result);
+ req->reqCallback(*this);
+ }
}
delete result;
result = 0;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=733966&r1=733965&r2=733966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h Mon Jan 12 15:31:49 2009
@@ -180,6 +180,23 @@
}
};
+class AsynchCallbackRequest : public AsynchIoResult {
+ // complete() needs to simply call the completionCallback; no buffers.
+ virtual void complete(void) {
+ completionCallback(this);
+ }
+
+public:
+ AsynchCallbackRequest(AsynchIoResult::Completer cb,
+ AsynchIO::RequestCallback reqCb)
+ : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) {
+ wsabuf.buf = 0;
+ wsabuf.len = 0;
+ }
+
+ AsynchIO::RequestCallback reqCallback;
+};
+
}}
#endif /*!_windows_asynchIoResult_h*/
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h?rev=733966&r1=733965&r2=733966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h Mon Jan 12 15:31:49 2009
@@ -33,16 +33,20 @@
// There should be either a valid socket handle or a completer callback.
// Handle is used to associate with poller's iocp; completer is used to
// inject a completion that will very quickly trigger a callback to the
-// completer from an I/O thread.
+// completer from an I/O thread. If the callback mechanism is used, there
+// can be a RequestCallback set - this carries the callback object through
+// from AsynchIO::requestCallback() through to the I/O completion processing.
class IOHandlePrivate {
public:
IOHandlePrivate(SOCKET f = INVALID_SOCKET,
- AsynchIoResult::Completer cb = 0) :
- fd(f), event(cb)
+ AsynchIoResult::Completer cb = 0,
+ AsynchIO::RequestCallback reqCallback = 0) :
+ fd(f), event(cb), cbRequest(reqCallback)
{}
SOCKET fd;
AsynchIoResult::Completer event;
+ AsynchIO::RequestCallback cbRequest;
};
SOCKET toFd(const IOHandlePrivate* h);
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=733966&r1=733965&r2=733966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp Mon Jan 12 15:31:49 2009
@@ -43,16 +43,19 @@
SOCKET fd;
AsynchIoResult::Completer cb;
+ AsynchIO::RequestCallback cbRequest;
- PollerHandlePrivate(SOCKET f, AsynchIoResult::Completer cb0 = 0) :
- fd(f), cb(cb0)
+ PollerHandlePrivate(SOCKET f,
+ AsynchIoResult::Completer cb0 = 0,
+ AsynchIO::RequestCallback rcb = 0)
+ : fd(f), cb(cb0), cbRequest(rcb)
{
}
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event))
+ impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event, h.impl->cbRequest))
{}
PollerHandle::~PollerHandle() {
@@ -114,8 +117,19 @@
QPID_WINDOWS_CHECK_NULL(iocpHandle);
}
else {
- AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
- PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ // INPUT is used to request a callback; OUTPUT to request a write
+ assert(dir == Poller::INPUT || dir == Poller::OUTPUT);
+
+ if (dir == Poller::OUTPUT) {
+ AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
+ else {
+ AsynchCallbackRequest *result =
+ new AsynchCallbackRequest(handle.impl->cb,
+ handle.impl->cbRequest);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
}
}