You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/10/20 19:10:06 UTC

svn commit: r827735 - in /qpid/trunk/qpid/cpp: include/qpid/sys/ src/qpid/broker/ src/qpid/sys/windows/

Author: shuston
Date: Tue Oct 20 17:10:05 2009
New Revision: 827735

URL: http://svn.apache.org/viewvc?rev=827735&view=rev
Log:
Carry over recent AsynchIO-level changes to Windows.

Modified:
    qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h Tue Oct 20 17:10:05 2009
@@ -35,6 +35,8 @@
 class AsynchAcceptorPrivate;
 class AsynchAcceptResult;
 namespace windows {
+    class AsynchAcceptor;
+    class AsynchAcceptResult;
     class AsynchIO;
 }
 
@@ -43,8 +45,8 @@
 class IOHandlePrivate;
 class IOHandle {
 
-    friend class AsynchAcceptorPrivate;
-    friend class AsynchAcceptResult;
+    friend class windows::AsynchAcceptResult;
+    friend class windows::AsynchAcceptor;
     friend class windows::AsynchIO;
 
     friend class PollerHandle;
@@ -52,7 +54,7 @@
 protected:
     IOHandlePrivate* const impl;
 
-	IOHandle(IOHandlePrivate*);
+    IOHandle(IOHandlePrivate*);
     QPID_COMMON_EXTERN virtual ~IOHandle();
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Oct 20 17:10:05 2009
@@ -201,7 +201,7 @@
     void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
 
     /** Accept connections */
-    void accept();
+    QPID_BROKER_EXTERN void accept();
 
     /** Create a connection to another broker. */
     void connect(const std::string& host, uint16_t port, 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Tue Oct 20 17:10:05 2009
@@ -74,6 +74,7 @@
 
 namespace qpid {
 namespace sys {
+namespace windows {
 
 /*
  * Asynch Acceptor
@@ -88,13 +89,13 @@
  * and status of each accept operation outstanding.
  */
 
-class AsynchAcceptorPrivate {
+class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
 
     friend class AsynchAcceptResult;
 
 public:
-    AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
-    ~AsynchAcceptorPrivate();
+    AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
+    ~AsynchAcceptor();
     void start(Poller::shared_ptr poller);
 
 private:
@@ -104,19 +105,7 @@
     const Socket& socket;
 };
 
-AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
-  impl(new AsynchAcceptorPrivate(s, callback))
-{}
-
-AsynchAcceptor::~AsynchAcceptor()
-{ delete impl; }
-
-void AsynchAcceptor::start(Poller::shared_ptr poller) {
-    impl->start(poller);
-}
-
-AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s,
-                                             AsynchAcceptor::Callback callback)
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
   : acceptedCallback(callback),
     socket(s) {
 
@@ -128,16 +117,17 @@
 #endif
 }
 
-AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) {
+AsynchAcceptor::~AsynchAcceptor()
+{
     socket.close();
 }
 
-void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
     poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
     restart ();
 }
 
-void AsynchAcceptorPrivate::restart(void) {
+void AsynchAcceptor::restart(void) {
     DWORD bytesReceived = 0;  // Not used, needed for AcceptEx API
     AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
                                                         this,
@@ -156,7 +146,7 @@
 
 
 AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
-                                       AsynchAcceptorPrivate *acceptor,
+                                       AsynchAcceptor *acceptor,
                                        SOCKET listener)
   : callback(cb), acceptor(acceptor), listener(listener) {
     newSocket.reset (new Socket());
@@ -174,13 +164,11 @@
 }
 
 void AsynchAcceptResult::failure(int status) {
-  //if (status != WSA_OPERATION_ABORTED)
-  // Can there be anything else?  ;
-  delete this;
+    //if (status != WSA_OPERATION_ABORTED)
+    // Can there be anything else?  ;
+    delete this;
 }
 
-namespace windows {
-
 /*
  * AsynchConnector does synchronous connects for now... to do asynch the
  * IocpPoller will need some extension to register an event handle as a
@@ -224,6 +212,12 @@
 
 } // namespace windows
 
+AsynchAcceptor* AsynchAcceptor::create(const Socket& s, 
+                                       Callback callback)
+{
+    return new windows::AsynchAcceptor(s, callback);
+}
+
 AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
                                                     Poller::shared_ptr poller,
                                                     std::string hostname,
@@ -231,12 +225,12 @@
                                                     ConnectedCallback connCb,
                                                     FailedCallback failCb)
 {
-    return new qpid::sys::windows::AsynchConnector(s,
-                                                   poller,
-                                                   hostname,
-                                                   port,
-                                                   connCb,
-                                                   failCb);
+    return new windows::AsynchConnector(s,
+                                        poller,
+                                        hostname,
+                                        port,
+                                        connCb,
+                                        failCb);
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h Tue Oct 20 17:10:05 2009
@@ -30,6 +30,7 @@
 
 namespace qpid {
 namespace sys {
+namespace windows {
 
 /*
  * AsynchIoResult defines the class that receives the result of an
@@ -73,14 +74,13 @@
     int status;
 };
 
-class AsynchAcceptorPrivate;
 class AsynchAcceptResult : public AsynchResult {
 
-    friend class AsynchAcceptorPrivate;
+    friend class AsynchAcceptor;
 
 public:
-    AsynchAcceptResult(AsynchAcceptor::Callback cb,
-                       AsynchAcceptorPrivate *acceptor,
+    AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
+                       AsynchAcceptor *acceptor,
                        SOCKET listener);
     virtual void success (size_t bytesTransferred);
     virtual void failure (int error);
@@ -89,8 +89,8 @@
     virtual void complete(void) {}  // No-op for this class.
 
     std::auto_ptr<qpid::sys::Socket> newSocket;
-    AsynchAcceptor::Callback callback;
-    AsynchAcceptorPrivate *acceptor;
+    qpid::sys::AsynchAcceptor::Callback callback;
+    AsynchAcceptor *acceptor;
     SOCKET listener;
 
     // AcceptEx needs a place to write the local and remote addresses
@@ -106,16 +106,16 @@
     typedef boost::function1<void, AsynchIoResult *> Completer;
 
     virtual ~AsynchIoResult() {}
-    AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
+    qpid::sys::AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
     size_t getRequested(void) const { return requested; }
     const WSABUF *getWSABUF(void) const { return &wsabuf; }
 
 protected:
-    void setBuff (AsynchIO::BufferBase *buffer) { iobuff = buffer; }
+    void setBuff (qpid::sys::AsynchIO::BufferBase *buffer) { iobuff = buffer; }
 
 protected:
     AsynchIoResult(Completer cb,
-                   AsynchIO::BufferBase *buff, size_t length)
+                   qpid::sys::AsynchIO::BufferBase *buff, size_t length)
       : completionCallback(cb), iobuff(buff), requested(length) {}
 
     virtual void complete(void) = 0;
@@ -123,7 +123,7 @@
     Completer completionCallback;
 
 private:
-    AsynchIO::BufferBase *iobuff;
+    qpid::sys::AsynchIO::BufferBase *iobuff;
     size_t  requested;     // Number of bytes in original I/O request
 };
 
@@ -137,7 +137,7 @@
 
 public:
     AsynchReadResult(AsynchIoResult::Completer cb,
-                     AsynchIO::BufferBase *buff,
+                     qpid::sys::AsynchIO::BufferBase *buff,
                      size_t length)
       : AsynchIoResult(cb, buff, length) {
         wsabuf.buf = buff->bytes + buff->dataCount;
@@ -149,7 +149,7 @@
 
     // complete() updates buffer then does completion callback.
     virtual void complete(void) {
-        AsynchIO::BufferBase *b = getBuff();
+        qpid::sys::AsynchIO::BufferBase *b = getBuff();
         b->dataStart += bytes;
         b->dataCount -= bytes;
         completionCallback(this);
@@ -157,7 +157,7 @@
 
 public:
     AsynchWriteResult(AsynchIoResult::Completer cb,
-                      AsynchIO::BufferBase *buff,
+                      qpid::sys::AsynchIO::BufferBase *buff,
                       size_t length)
       : AsynchIoResult(cb, buff, length) {
         wsabuf.buf = buff ? buff->bytes : 0;
@@ -188,15 +188,15 @@
 
 public:
     AsynchCallbackRequest(AsynchIoResult::Completer cb,
-                          AsynchIO::RequestCallback reqCb)
+                          qpid::sys::AsynchIO::RequestCallback reqCb)
       : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) {
         wsabuf.buf = 0;
         wsabuf.len = 0;
     }
 
-    AsynchIO::RequestCallback reqCallback;
+    qpid::sys::AsynchIO::RequestCallback reqCallback;
 };
 
-}}
+}}}  // qpid::sys::windows
 
 #endif  /*!_windows_asynchIoResult_h*/

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h Tue Oct 20 17:10:05 2009
@@ -22,6 +22,7 @@
  *
  */
 
+#include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/windows/AsynchIoResult.h"
 #include "qpid/CommonImportExport.h"
 
@@ -40,13 +41,13 @@
 class IOHandlePrivate {
 public:
     IOHandlePrivate(SOCKET f = INVALID_SOCKET,
-                    AsynchIoResult::Completer cb = 0,
+                    windows::AsynchIoResult::Completer cb = 0,
                     AsynchIO::RequestCallback reqCallback = 0) :
     fd(f), event(cb), cbRequest(reqCallback)
     {}
     
     SOCKET fd;
-    AsynchIoResult::Completer event;
+    windows::AsynchIoResult::Completer event;
     AsynchIO::RequestCallback cbRequest;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp Tue Oct 20 17:10:05 2009
@@ -42,11 +42,11 @@
     friend class PollerHandle;
 
     SOCKET fd;
-    AsynchIoResult::Completer cb;
+    windows::AsynchIoResult::Completer cb;
     AsynchIO::RequestCallback cbRequest;
 
     PollerHandlePrivate(SOCKET f,
-                        AsynchIoResult::Completer cb0 = 0,
+                        windows::AsynchIoResult::Completer cb0 = 0,
                         AsynchIO::RequestCallback rcb = 0)
       : fd(f), cb(cb0), cbRequest(rcb)
     {
@@ -133,13 +133,14 @@
         assert(dir == Poller::INPUT || dir == Poller::OUTPUT);
 
         if (dir == Poller::OUTPUT) {
-            AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
+            windows::AsynchWriteWanted *result =
+                new windows::AsynchWriteWanted(handle.impl->cb);
             PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
         }
         else {
-            AsynchCallbackRequest *result =
-                new AsynchCallbackRequest(handle.impl->cb,
-                                          handle.impl->cbRequest);
+            windows::AsynchCallbackRequest *result =
+                new windows::AsynchCallbackRequest(handle.impl->cb,
+                                                   handle.impl->cbRequest);
             PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
         }
     }
@@ -155,7 +156,7 @@
     DWORD numTransferred = 0;
     ULONG_PTR completionKey = 0;
     OVERLAPPED *overlapped = 0;
-    AsynchResult *result = 0;
+    windows::AsynchResult *result = 0;
 
     // Wait for either an I/O operation to finish (thus signaling the
     // IOCP handle) or a shutdown request to be made (thus signaling the
@@ -185,7 +186,7 @@
             return Event(0, SHUTDOWN);
         }
 
-        result = AsynchResult::from_overlapped(overlapped);
+        result = windows::AsynchResult::from_overlapped(overlapped);
         result->success (static_cast<size_t>(numTransferred));
     }
     else {
@@ -193,7 +194,7 @@
         // Dequeued a completion for a failed operation. Downcast back
         // to the result object and inform it that the operation failed.
         DWORD status = ::GetLastError();
-        result = AsynchResult::from_overlapped(overlapped);
+        result = windows::AsynchResult::from_overlapped(overlapped);
         result->failure (static_cast<int>(status));
       }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp Tue Oct 20 17:10:05 2009
@@ -47,8 +47,8 @@
                              const boost::shared_ptr<sys::Poller>& poller);
     ~PollableConditionPrivate();
 
-    void poke();
-    void dispatch(AsynchIoResult *result);
+  void poke();
+  void dispatch(windows::AsynchIoResult *result);
 
 private:
     PollableCondition::Callback cb;
@@ -82,7 +82,7 @@
     poller->monitorHandle(ph, Poller::INPUT);
 }
 
-void PollableConditionPrivate::dispatch(AsynchIoResult *result)
+void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result)
 {
     delete result;       // Poller::monitorHandle() allocates this
     cb(parent);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp Tue Oct 20 17:10:05 2009
@@ -135,7 +135,9 @@
 }  // namespace
 
 Socket::Socket() :
-	IOHandle(new IOHandlePrivate)
+    IOHandle(new IOHandlePrivate),
+    nonblocking(false),
+    nodelay(false)
 {
     SOCKET& socket = impl->fd;
     if (socket != INVALID_SOCKET) Socket::close();
@@ -145,7 +147,9 @@
 }
 
 Socket::Socket(IOHandlePrivate* h) :
-	IOHandle(h)
+    IOHandle(h),
+    nonblocking(false),
+    nodelay(false)
 {}
 
 void
@@ -162,6 +166,7 @@
 
     try {
         if (nonblocking) setNonblocking();
+        if (nodelay) setTcpNoDelay();
     } catch (std::exception&) {
         closesocket(s);
         socket = INVALID_SOCKET;
@@ -313,17 +318,16 @@
     return result;
 }
 
-void Socket::setTcpNoDelay(bool nodelay) const
+void Socket::setTcpNoDelay() const
 {
-    if (nodelay) {
-        int flag = 1;
-        int result = setsockopt(impl->fd,
-                                IPPROTO_TCP,
-                                TCP_NODELAY,
-                                (char *)&flag,
-                                sizeof(flag));
-        QPID_WINSOCK_CHECK(result);
-    }
+    int flag = 1;
+    int result = setsockopt(impl->fd,
+                            IPPROTO_TCP,
+                            TCP_NODELAY,
+                            (char *)&flag,
+                            sizeof(flag));
+    QPID_WINSOCK_CHECK(result);
+    nodelay = true;
 }
 
 }} // namespace qpid::sys



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org