You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/01/13 00:31:51 UTC

svn commit: r733966 - in /qpid/trunk/qpid/cpp/src/qpid/sys/windows: AsynchIO.cpp AsynchIoResult.h IoHandlePrivate.h IocpPoller.cpp

Author: shuston
Date: Mon Jan 12 15:31:49 2009
New Revision: 733966

URL: http://svn.apache.org/viewvc?rev=733966&view=rev
Log:
Add support for AsynchIO::RequestCallback processing

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=733966&r1=733965&r2=733966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon Jan 12 15:31:49 2009
@@ -534,9 +534,22 @@
     return;
 }
 
-// TODO: This needs to arrange for a callback that is serialised with
-// the other IO callbacks for this AsynchIO
+// Queue the specified callback for invocation from an I/O thread.
 void AsynchIO::requestCallback(RequestCallback callback) {
+    // This method is generally called from a processing thread; transfer
+    // work on this to an I/O thread. Much of the upper layer code assumes
+    // that all I/O-related things happen in an I/O thread.
+    if (poller == 0)    // Not really going yet...
+        return;
+
+    InterlockedIncrement(&opsInProgress);
+    IOHandlePrivate *hp =
+        new IOHandlePrivate (INVALID_SOCKET,
+                             boost::bind(&AsynchIO::completion, this, _1),
+                             callback);
+    IOHandle h(hp);
+    PollerHandle ph(h);
+    poller->addFd(ph, Poller::INPUT);
 }
 
 /**
@@ -714,7 +727,13 @@
                 else {
                     AsynchWriteResult *w =
                         dynamic_cast<AsynchWriteResult*>(result);
-                    writeComplete(w);
+                    if (w != 0)
+                        writeComplete(w);
+                    else {
+                        AsynchCallbackRequest *req =
+                          dynamic_cast<AsynchCallbackRequest*>(result);
+                        req->reqCallback(*this);
+                    }
                 }
                 delete result;
                 result = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=733966&r1=733965&r2=733966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h Mon Jan 12 15:31:49 2009
@@ -180,6 +180,23 @@
     }
 };
 
+class AsynchCallbackRequest : public AsynchIoResult {
+    // complete() needs to simply call the completionCallback; no buffers.
+    virtual void complete(void) {
+        completionCallback(this);
+    }
+
+public:
+    AsynchCallbackRequest(AsynchIoResult::Completer cb,
+                          AsynchIO::RequestCallback reqCb)
+      : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) {
+        wsabuf.buf = 0;
+        wsabuf.len = 0;
+    }
+
+    AsynchIO::RequestCallback reqCallback;
+};
+
 }}
 
 #endif  /*!_windows_asynchIoResult_h*/

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h?rev=733966&r1=733965&r2=733966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h Mon Jan 12 15:31:49 2009
@@ -33,16 +33,20 @@
 // There should be either a valid socket handle or a completer callback.
 // Handle is used to associate with poller's iocp; completer is used to
 // inject a completion that will very quickly trigger a callback to the
-// completer from an I/O thread.
+// completer from an I/O thread. If the callback mechanism is used, there
+// can be a RequestCallback set - this carries the callback object through
+// from AsynchIO::requestCallback() through to the I/O completion processing.
 class IOHandlePrivate {
 public:
     IOHandlePrivate(SOCKET f = INVALID_SOCKET,
-                    AsynchIoResult::Completer cb = 0) :
-    fd(f), event(cb)
+                    AsynchIoResult::Completer cb = 0,
+                    AsynchIO::RequestCallback reqCallback = 0) :
+    fd(f), event(cb), cbRequest(reqCallback)
     {}
     
     SOCKET fd;
     AsynchIoResult::Completer event;
+    AsynchIO::RequestCallback cbRequest;
 };
 
 SOCKET toFd(const IOHandlePrivate* h);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=733966&r1=733965&r2=733966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp Mon Jan 12 15:31:49 2009
@@ -43,16 +43,19 @@
 
     SOCKET fd;
     AsynchIoResult::Completer cb;
+    AsynchIO::RequestCallback cbRequest;
 
-    PollerHandlePrivate(SOCKET f, AsynchIoResult::Completer cb0 = 0) :
-      fd(f), cb(cb0)
+    PollerHandlePrivate(SOCKET f,
+                        AsynchIoResult::Completer cb0 = 0,
+                        AsynchIO::RequestCallback rcb = 0)
+      : fd(f), cb(cb0), cbRequest(rcb)
     {
     }
     
 };
 
 PollerHandle::PollerHandle(const IOHandle& h) :
-    impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event))
+  impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event, h.impl->cbRequest))
 {}
 
 PollerHandle::~PollerHandle() {
@@ -114,8 +117,19 @@
         QPID_WINDOWS_CHECK_NULL(iocpHandle);
     }
     else {
-        AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
-        PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+        // INPUT is used to request a callback; OUTPUT to request a write
+        assert(dir == Poller::INPUT || dir == Poller::OUTPUT);
+
+        if (dir == Poller::OUTPUT) {
+            AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
+            PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+        }
+        else {
+            AsynchCallbackRequest *result =
+                new AsynchCallbackRequest(handle.impl->cb,
+                                          handle.impl->cbRequest);
+            PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+        }
     }
 }