You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2008/10/21 20:29:51 UTC

svn commit: r706709 [1/2] - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/client/ qpid/log/ qpid/sys/ qpid/sys/epoll/ qpid/sys/posix/ qpid/sys/solaris/ qpid/sys/windows/ tests/

Author: shuston
Date: Tue Oct 21 11:29:44 2008
New Revision: 706709

URL: http://svn.apache.org/viewvc?rev=706709&view=rev
Log:
Refactor sys::AsynchIO class to allow reimplementing on other platforms without affecting upper level usage. Resolves QPID-1377 and supplies Windows AsynchIO.cpp

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Oct 21 11:29:44 2008
@@ -283,6 +283,7 @@
   qpid/sys/AggregateOutput.cpp \
   qpid/sys/AsynchIOHandler.cpp \
   qpid/sys/Dispatcher.cpp \
+  qpid/sys/DispatchHandle.cpp \
   qpid/sys/PollableCondition.h \
   qpid/sys/PollableQueue.h \
   qpid/sys/Runnable.cpp \
@@ -609,6 +610,7 @@
   qpid/sys/ConnectionOutputHandlerPtr.h \
   qpid/sys/DeletionManager.h \
   qpid/sys/Dispatcher.h \
+  qpid/sys/DispatchHandle.h \
   qpid/sys/FileSysDir.h \
   qpid/sys/IntegerTypes.h \
   qpid/sys/IOHandle.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Tue Oct 21 11:29:44 2008
@@ -22,7 +22,6 @@
 
 #include "qpid/log/Logger.h"
 #include "qpid/sys/Socket.h"
-#include <sys/socket.h>
 
 namespace qpid {
 namespace client {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Oct 21 11:29:44 2008
@@ -200,7 +200,7 @@
     identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     closed = false;
     poller = Poller::shared_ptr(new Poller);
-    aio = new AsynchIO(socket,
+    aio = AsynchIO::create(socket,
                        boost::bind(&TCPConnector::readbuff, this, _1, _2),
                        boost::bind(&TCPConnector::eof, this, _1),
                        boost::bind(&TCPConnector::eof, this, _1),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h Tue Oct 21 11:29:44 2008
@@ -24,7 +24,7 @@
 
 namespace qpid {
 namespace log {
-class Options;
+struct Options;
 
 /**
  * A selector identifies the set of log messages to enable.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Tue Oct 21 11:29:44 2008
@@ -21,7 +21,8 @@
  *
  */
 
-#include "Dispatcher.h"
+// @@TODO: TAKE THIS OUT... SHould be in posix version.
+#include "DispatchHandle.h"
 
 #include <boost/function.hpp>
 #include <deque>
@@ -35,48 +36,45 @@
  * Asynchronous acceptor: accepts connections then does a callback with the
  * accepted fd
  */
+class AsynchAcceptorPrivate;
 class AsynchAcceptor {
 public:
     typedef boost::function1<void, const Socket&> Callback;
 
 private:
-    Callback acceptedCallback;
-    DispatchHandle handle;
-    const Socket& socket;
+    AsynchAcceptorPrivate* impl;
 
 public:
     AsynchAcceptor(const Socket& s, Callback callback);
+    ~AsynchAcceptor();
     void start(Poller::shared_ptr poller);
-
-private:
-    void readable(DispatchHandle& handle);
 };
 
 /*
  * Asynchronous connector: starts the process of initiating a connection and
  * invokes a callback when completed or failed.
  */
-class AsynchConnector : private DispatchHandle {
+class AsynchConnector {
 public:
     typedef boost::function1<void, const Socket&> ConnectedCallback;
     typedef boost::function2<void, int, std::string> FailedCallback;
 
-private:
-    ConnectedCallback connCallback;
-    FailedCallback failCallback;
-    const Socket& socket;
-
-public:
-    AsynchConnector(const Socket& socket,
-                    Poller::shared_ptr poller,
-                    std::string hostname,
-                    uint16_t port,
-                    ConnectedCallback connCb,
-                    FailedCallback failCb = 0);
-
-private:
-    void connComplete(DispatchHandle& handle);
-    void failure(int, std::string);
+    // Call create() to allocate a new AsynchConnector object with the
+    // specified poller, addressing, and callbacks.
+    // This method is implemented in platform-specific code to
+    // create a correctly typed object. The platform code also manages
+    // deletes. To correctly manage heaps when needed, the allocate and
+    // delete should both be done from the same class/library.
+    static AsynchConnector* create(const Socket& s,
+                                   Poller::shared_ptr poller,
+                                   std::string hostname,
+                                   uint16_t port,
+                                   ConnectedCallback connCb,
+                                   FailedCallback failCb = 0);
+
+protected:
+    AsynchConnector() {}
+    virtual ~AsynchConnector() {}
 };
 
 struct AsynchIOBufferBase {
@@ -99,16 +97,14 @@
 /*
  * Asychronous reader/writer: 
  * Reader accepts buffers to read into; reads into the provided buffers
- * and then does a callback with the buffer and amount read. Optionally it can callback
- * when there is something to read but no buffer to read it into.
+ * and then does a callback with the buffer and amount read. Optionally it
+ * can callback when there is something to read but no buffer to read it into.
  * 
  * Writer accepts a buffer and queues it for writing; can also be given
- * a callback for when writing is "idle" (ie fd is writable, but nothing to write)
- * 
- * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting
- * the contained DispatchHandle
+ * a callback for when writing is "idle" (ie fd is writable, but nothing
+ * to write).
  */
-class AsynchIO : private DispatchHandle {
+class AsynchIO {
 public:
     typedef AsynchIOBufferBase BufferBase;
 
@@ -119,46 +115,35 @@
     typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
     typedef boost::function1<void, AsynchIO&> IdleCallback;
 
-private:
-    ReadCallback readCallback;
-    EofCallback eofCallback;
-    DisconnectCallback disCallback;
-    ClosedCallback closedCallback;
-    BuffersEmptyCallback emptyCallback;
-    IdleCallback idleCallback;
-    const Socket& socket;
-    std::deque<BufferBase*> bufferQueue;
-    std::deque<BufferBase*> writeQueue;
-    bool queuedClose;
-    /**
-     * This flag is used to detect and handle concurrency between
-     * calls to notifyPendingWrite() (which can be made from any thread) and
-     * the execution of the writeable() method (which is always on the
-     * thread processing this handle.
-     */
-    volatile bool writePending;
-
-public:
-    AsynchIO(const Socket& s,
-        ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
-        ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
-    void queueForDeletion();
-
-    void start(Poller::shared_ptr poller);
-    void queueReadBuffer(BufferBase* buff);
-    void unread(BufferBase* buff);
-    void queueWrite(BufferBase* buff);
-    void notifyPendingWrite();
-    void queueWriteClose();
-    bool writeQueueEmpty() { return writeQueue.empty(); }
-    BufferBase* getQueuedBuffer();
-
-private:
-    ~AsynchIO();
-    void readable(DispatchHandle& handle);
-    void writeable(DispatchHandle& handle);
-    void disconnected(DispatchHandle& handle);
-    void close(DispatchHandle& handle);
+    // Call create() to allocate a new AsynchIO object with the specified
+    // callbacks. This method is implemented in platform-specific code to
+    // create a correctly typed object. The platform code also manages
+    // deletes. To correctly manage heaps when needed, the allocate and
+    // delete should both be done from the same class/library.
+    static AsynchIO* create(const Socket& s,
+                            ReadCallback rCb,
+                            EofCallback eofCb,
+                            DisconnectCallback disCb,
+                            ClosedCallback cCb = 0,
+                            BuffersEmptyCallback eCb = 0,
+                            IdleCallback iCb = 0);
+public:
+    virtual void queueForDeletion() = 0;
+
+    virtual void start(Poller::shared_ptr poller) = 0;
+    virtual void queueReadBuffer(BufferBase* buff) = 0;
+    virtual void unread(BufferBase* buff) = 0;
+    virtual void queueWrite(BufferBase* buff) = 0;
+    virtual void notifyPendingWrite() = 0;
+    virtual void queueWriteClose() = 0;
+    virtual bool writeQueueEmpty() = 0;
+    virtual BufferBase* getQueuedBuffer() = 0;
+
+protected:
+    // Derived class manages lifetime; must be constructed using the
+    // static create() method. Deletes not allowed from outside.
+    AsynchIO() {}
+    virtual ~AsynchIO() {}
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Tue Oct 21 11:29:44 2008
@@ -33,7 +33,7 @@
 namespace sys {
 
 class AsynchIO;
-class AsynchIOBufferBase;
+struct AsynchIOBufferBase;
 class Socket;
 
 class AsynchIOHandler : public OutputControl {

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Tue Oct 21 11:29:44 2008
@@ -0,0 +1,409 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "DispatchHandle.h"
+
+#include <boost/cast.hpp>
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+DispatchHandle::~DispatchHandle() {
+    stopWatch();
+}
+
+void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
+    bool r = readableCallback;
+    bool w = writableCallback;
+
+    ScopedLock<Mutex> lock(stateLock);
+    assert(state == IDLE);
+
+    // If no callbacks set then do nothing (that is what we were asked to do!)
+    // TODO: Maybe this should be an assert instead
+    if (!r && !w) {
+        state = INACTIVE;
+        return;
+    }
+
+    Poller::Direction d = r ?
+        (w ? Poller::INOUT : Poller::INPUT) :
+        Poller::OUTPUT;
+
+    poller = poller0;
+    poller->addFd(*this, d);
+    
+    state = r ?
+        (w ? ACTIVE_RW : ACTIVE_R) :
+        ACTIVE_W;
+}
+
+void DispatchHandle::rewatch() {
+    bool r = readableCallback;
+    bool w = writableCallback;
+
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case IDLE:
+    case DELAYED_IDLE:
+        break;
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_INACTIVE:
+        state = r ?
+            (w ? DELAYED_RW : DELAYED_R) :
+            DELAYED_W;
+        break;
+    case DELAYED_DELETE:
+        break;
+    case INACTIVE:
+    case ACTIVE_R:
+    case ACTIVE_W: {
+        assert(poller);
+        Poller::Direction d = r ?
+            (w ? Poller::INOUT : Poller::INPUT) :
+            Poller::OUTPUT;
+        poller->modFd(*this, d);
+        state = r ?
+            (w ? ACTIVE_RW : ACTIVE_R) :
+            ACTIVE_W;
+        break;
+        }
+    case DELAYED_RW:
+    case ACTIVE_RW:
+        // Don't need to do anything already waiting for readable/writable
+        break;
+    }
+}
+
+void DispatchHandle::rewatchRead() {
+    if (!readableCallback) {
+        return;
+    }
+    
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case IDLE:
+    case DELAYED_IDLE:
+        break;
+    case DELAYED_R:
+    case DELAYED_RW:
+    case DELAYED_DELETE:
+        break;
+    case DELAYED_W:
+        state = DELAYED_RW;
+        break;
+    case DELAYED_INACTIVE:
+        state = DELAYED_R;
+        break;
+    case ACTIVE_R:
+    case ACTIVE_RW:
+        // Nothing to do: already waiting for readable
+        break;
+    case INACTIVE:
+        assert(poller);
+        poller->modFd(*this, Poller::INPUT);
+        state = ACTIVE_R;
+        break;
+    case ACTIVE_W:
+        assert(poller);
+        poller->modFd(*this, Poller::INOUT);
+        state = ACTIVE_RW;
+        break;
+    }
+}
+
+void DispatchHandle::rewatchWrite() {
+    if (!writableCallback) {
+        return;
+    }
+    
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case IDLE:
+    case DELAYED_IDLE:
+        break;
+    case DELAYED_W:
+    case DELAYED_RW:
+    case DELAYED_DELETE:
+        break;
+    case DELAYED_R:
+        state = DELAYED_RW;
+        break;
+    case DELAYED_INACTIVE:
+        state = DELAYED_W;
+        break;
+    case INACTIVE:
+        assert(poller);
+        poller->modFd(*this, Poller::OUTPUT);
+        state = ACTIVE_W;
+        break;
+    case ACTIVE_R:
+        assert(poller);
+        poller->modFd(*this, Poller::INOUT);
+        state = ACTIVE_RW;
+        break;
+    case ACTIVE_W:
+    case ACTIVE_RW:
+        // Nothing to do: already waiting for writable
+        break;
+   }
+}
+
+void DispatchHandle::unwatchRead() {
+    if (!readableCallback) {
+        return;
+    }
+    
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case IDLE:
+    case DELAYED_IDLE:
+        break;
+    case DELAYED_R:
+        state = DELAYED_INACTIVE;
+        break;
+    case DELAYED_RW:
+        state = DELAYED_W;    
+        break;
+    case DELAYED_W:
+    case DELAYED_INACTIVE:
+    case DELAYED_DELETE:
+        break;
+    case ACTIVE_R:
+        assert(poller);
+        poller->modFd(*this, Poller::NONE);
+        state = INACTIVE;
+        break;
+    case ACTIVE_RW:
+        assert(poller);
+        poller->modFd(*this, Poller::OUTPUT);
+        state = ACTIVE_W;
+        break;
+    case ACTIVE_W:
+    case INACTIVE:
+        break;
+    }
+}
+
+void DispatchHandle::unwatchWrite() {
+    if (!writableCallback) {
+        return;
+    }
+    
+    ScopedLock<Mutex> lock(stateLock);
+    switch(state) {
+    case IDLE:
+    case DELAYED_IDLE:
+        break;
+    case DELAYED_W:
+        state = DELAYED_INACTIVE;
+        break;
+    case DELAYED_RW:
+        state = DELAYED_R;
+        break;
+    case DELAYED_R:
+    case DELAYED_INACTIVE:
+    case DELAYED_DELETE:
+        break;
+    case ACTIVE_W:
+        assert(poller);
+        poller->modFd(*this, Poller::NONE);
+        state = INACTIVE;
+        break;
+    case ACTIVE_RW:
+        assert(poller);
+        poller->modFd(*this, Poller::INPUT);
+        state = ACTIVE_R;
+        break;
+    case ACTIVE_R:
+    case INACTIVE:
+        break;
+   }
+}
+
+void DispatchHandle::unwatch() {
+    ScopedLock<Mutex> lock(stateLock);
+    switch (state) {
+    case IDLE:
+    case DELAYED_IDLE:
+        break;
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case DELAYED_INACTIVE:
+        state = DELAYED_INACTIVE;
+        break;
+    case DELAYED_DELETE:
+        break;
+    default:
+        assert(poller);
+        poller->modFd(*this, Poller::NONE);
+        state = INACTIVE;
+        break;
+    }            
+}
+
+void DispatchHandle::stopWatch() {
+    ScopedLock<Mutex> lock(stateLock);
+    switch (state) {
+    case IDLE:
+    case DELAYED_IDLE:
+    case DELAYED_DELETE:
+    	return;
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case DELAYED_INACTIVE:
+    	state = DELAYED_IDLE;
+    	break;
+    default:
+	    state = IDLE;
+	    break;
+    }
+    assert(poller);
+    poller->delFd(*this);
+    poller.reset();
+}
+
+// The slightly strange switch structure
+// is to ensure that the lock is released before
+// we do the delete
+void DispatchHandle::doDelete() {
+    // Ensure that we're no longer watching anything
+    stopWatch();
+
+    // If we're in the middle of a callback defer the delete
+    {
+    ScopedLock<Mutex> lock(stateLock);
+    switch (state) {
+    case DELAYED_IDLE:
+    case DELAYED_DELETE:
+        state = DELAYED_DELETE;
+        return;
+    case IDLE:
+    	break;
+    default:
+    	// Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
+        assert(false);
+    }
+    }
+    // If we're not then do it right away
+    delete this;
+}
+
+void DispatchHandle::processEvent(Poller::EventType type) {
+    // Note that we are now doing the callbacks
+    {
+    ScopedLock<Mutex> lock(stateLock);
+    
+    // Set up to wait for same events next time unless reset
+    switch(state) {
+    case ACTIVE_R:
+        state = DELAYED_R;
+        break;
+    case ACTIVE_W:
+        state = DELAYED_W;
+        break;
+    case ACTIVE_RW:
+        state = DELAYED_RW;
+        break;
+    // Can only get here in a DELAYED_* state in the rare case
+    // that we're already here for reading and we get activated for
+    // writing and we can write (it might be possible the other way
+    // round too). In this case we're already processing the handle
+    // in a different thread in this function so return right away
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case DELAYED_INACTIVE:
+    case DELAYED_IDLE:
+    case DELAYED_DELETE:
+        return;
+    default:
+        assert(false);
+    }
+    }
+
+    // Do callbacks - whilst we are doing the callbacks we are prevented from processing
+    // the same handle until we re-enable it. To avoid rentering the callbacks for a single
+    // handle re-enabling in the callbacks is actually deferred until they are complete.
+    switch (type) {
+    case Poller::READABLE:
+        readableCallback(*this);
+        break;
+    case Poller::WRITABLE:
+        writableCallback(*this);
+        break;
+    case Poller::READ_WRITABLE:
+        readableCallback(*this);
+        writableCallback(*this);
+        break;
+    case Poller::DISCONNECTED:
+        {
+        ScopedLock<Mutex> lock(stateLock);
+        state = DELAYED_INACTIVE;
+        }
+        if (disconnectedCallback) {
+            disconnectedCallback(*this);
+        }
+        break;
+    default:
+        assert(false);
+    }
+
+    // If any of the callbacks re-enabled reading/writing then actually
+    // do it now
+    {
+    ScopedLock<Mutex> lock(stateLock);
+    switch (state) {
+    case DELAYED_R:
+        poller->modFd(*this, Poller::INPUT);
+        state = ACTIVE_R;
+        return;
+    case DELAYED_W:
+        poller->modFd(*this, Poller::OUTPUT);
+        state = ACTIVE_W;
+        return;
+    case DELAYED_RW:
+        poller->modFd(*this, Poller::INOUT);
+        state = ACTIVE_RW;
+        return;
+    case DELAYED_INACTIVE:
+        state = INACTIVE;
+        return;
+    case DELAYED_IDLE:
+    	state = IDLE;
+    	return;
+    default:
+        // This should be impossible
+        assert(false);
+        return;
+    case DELAYED_DELETE:
+        break;
+    }
+    }      
+    delete this;
+}
+
+}}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h Tue Oct 21 11:29:44 2008
@@ -0,0 +1,146 @@
+#ifndef _sys_DispatchHandle_h
+#define _sys_DispatchHandle_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Poller.h"
+#include "Mutex.h"
+
+#include <boost/function.hpp>
+
+
+namespace qpid {
+namespace sys {
+
+class DispatchHandleRef;
+/**
+ * In order to have your own handle (file descriptor on Unix) watched by the poller
+ * you need to:
+ * 
+ * - Subclass IOHandle, in the constructor supply an appropriate
+ *    IOHandlerPrivate object for the platform.
+ *    
+ * - Construct a DispatchHandle passing it your IOHandle and 
+ *   callback functions for read, write and disconnect events.
+ *
+ * - Ensure the DispatchHandle is not deleted until the poller is no longer using it.
+ *   TODO: astitcher document DispatchHandleRef to simplify this.
+ *   
+ * When an event occurs on the handle, the poller calls the relevant callback and
+ * stops watching that handle. Your callback can call rewatch() or related functions
+ * to re-enable polling.
+ */
+class DispatchHandle : public PollerHandle {
+    friend class DispatchHandleRef;
+public:
+    typedef boost::function1<void, DispatchHandle&> Callback;
+
+private:
+    Callback readableCallback;
+    Callback writableCallback;
+    Callback disconnectedCallback;
+    Poller::shared_ptr poller;
+    Mutex stateLock;
+    enum {
+        IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
+        DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
+        DELAYED_DELETE
+    } state;
+
+public:
+    /**
+     * Provide a handle to poll and a set of callbacks.  Note
+     * callbacks can be 0, meaning you are not interested in that
+     * event.
+     * 
+     *@param h: the handle to watch. The IOHandle encapsulates a
+     * platfrom-specific handle to an IO object (e.g. a file descriptor
+     * on Unix.)
+     *@param rCb Callback called when the handle is readable.
+     *@param wCb Callback called when the handle is writable.
+     *@param dCb Callback called when the handle is disconnected.
+     */
+    DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+      PollerHandle(h),
+      readableCallback(rCb),
+      writableCallback(wCb),
+      disconnectedCallback(dCb),
+      state(IDLE)
+    {}
+
+    ~DispatchHandle();
+
+    /** Add this DispatchHandle to the poller to be watched. */
+    void startWatch(Poller::shared_ptr poller);
+
+    /** Resume watchingn for all non-0 callbacks. */
+    void rewatch();
+    /** Resume watchingn for read only. */
+    void rewatchRead();
+
+    /** Resume watchingn for write only. */
+    void rewatchWrite();
+
+    /** Stop watching temporarily. The DispatchHandle remains
+        associated with the poller and can be re-activated using
+        rewatch. */
+    void unwatch();
+    /** Stop watching for read */
+    void unwatchRead();
+    /** Stop watching for write */
+    void unwatchWrite();
+
+    /** Stop watching permanently. Disassociates from the poller. */
+    void stopWatch();
+    
+protected:
+    /** Override to get extra processing done when the DispatchHandle is deleted. */
+    void doDelete();
+
+private:
+    void processEvent(Poller::EventType dir);
+};
+
+class DispatchHandleRef {
+    DispatchHandle* ref;
+
+public:
+    typedef boost::function1<void, DispatchHandle&> Callback;
+    DispatchHandleRef(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+      ref(new DispatchHandle(h, rCb, wCb, dCb))
+    {}
+
+    ~DispatchHandleRef() { ref->doDelete(); }
+
+    void startWatch(Poller::shared_ptr poller) { ref->startWatch(poller); }
+    void rewatch() { ref->rewatch(); }
+    void rewatchRead() { ref->rewatchRead(); }
+    void rewatchWrite() { ref->rewatchWrite(); }
+    void unwatch() { ref->unwatch(); }
+    void unwatchRead() { ref->unwatchRead(); }
+    void unwatchWrite() { ref->unwatchWrite(); }
+    void stopWatch() { ref->stopWatch(); }
+};
+
+}}
+
+#endif // _sys_DispatchHandle_h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Tue Oct 21 11:29:44 2008
@@ -21,8 +21,6 @@
 
 #include "Dispatcher.h"
 
-#include <boost/cast.hpp>
-
 #include <assert.h>
 
 namespace qpid {
@@ -58,382 +56,4 @@
     ;
 }
 
-DispatchHandle::~DispatchHandle() {
-    stopWatch();
-}
-
-void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
-    bool r = readableCallback;
-    bool w = writableCallback;
-
-    ScopedLock<Mutex> lock(stateLock);
-    assert(state == IDLE);
-
-    // If no callbacks set then do nothing (that is what we were asked to do!)
-    // TODO: Maybe this should be an assert instead
-    if (!r && !w) {
-        state = INACTIVE;
-        return;
-    }
-
-    Poller::Direction d = r ?
-        (w ? Poller::INOUT : Poller::IN) :
-        Poller::OUT;
-
-    poller = poller0;
-    poller->addFd(*this, d);
-    
-    state = r ?
-        (w ? ACTIVE_RW : ACTIVE_R) :
-        ACTIVE_W;
-}
-
-void DispatchHandle::rewatch() {
-    bool r = readableCallback;
-    bool w = writableCallback;
-
-    ScopedLock<Mutex> lock(stateLock);
-    switch(state) {
-    case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_INACTIVE:
-        state = r ?
-            (w ? DELAYED_RW : DELAYED_R) :
-            DELAYED_W;
-        break;
-    case DELAYED_DELETE:
-        break;
-    case INACTIVE:
-    case ACTIVE_R:
-    case ACTIVE_W: {
-        assert(poller);
-        Poller::Direction d = r ?
-            (w ? Poller::INOUT : Poller::IN) :
-            Poller::OUT;
-        poller->modFd(*this, d);
-        state = r ?
-            (w ? ACTIVE_RW : ACTIVE_R) :
-            ACTIVE_W;
-        break;
-        }
-    case DELAYED_RW:
-    case ACTIVE_RW:
-        // Don't need to do anything already waiting for readable/writable
-        break;
-    }
-}
-
-void DispatchHandle::rewatchRead() {
-    if (!readableCallback) {
-        return;
-    }
-    
-    ScopedLock<Mutex> lock(stateLock);
-    switch(state) {
-    case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_R:
-    case DELAYED_RW:
-    case DELAYED_DELETE:
-        break;
-    case DELAYED_W:
-        state = DELAYED_RW;
-        break;
-    case DELAYED_INACTIVE:
-        state = DELAYED_R;
-        break;
-    case ACTIVE_R:
-    case ACTIVE_RW:
-        // Nothing to do: already waiting for readable
-        break;
-    case INACTIVE:
-        assert(poller);
-        poller->modFd(*this, Poller::IN);
-        state = ACTIVE_R;
-        break;
-    case ACTIVE_W:
-        assert(poller);
-        poller->modFd(*this, Poller::INOUT);
-        state = ACTIVE_RW;
-        break;
-    }
-}
-
-void DispatchHandle::rewatchWrite() {
-    if (!writableCallback) {
-        return;
-    }
-    
-    ScopedLock<Mutex> lock(stateLock);
-    switch(state) {
-    case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_DELETE:
-        break;
-    case DELAYED_R:
-        state = DELAYED_RW;
-        break;
-    case DELAYED_INACTIVE:
-        state = DELAYED_W;
-        break;
-    case INACTIVE:
-        assert(poller);
-        poller->modFd(*this, Poller::OUT);
-        state = ACTIVE_W;
-        break;
-    case ACTIVE_R:
-        assert(poller);
-        poller->modFd(*this, Poller::INOUT);
-        state = ACTIVE_RW;
-        break;
-    case ACTIVE_W:
-    case ACTIVE_RW:
-        // Nothing to do: already waiting for writable
-        break;
-   }
-}
-
-void DispatchHandle::unwatchRead() {
-    if (!readableCallback) {
-        return;
-    }
-    
-    ScopedLock<Mutex> lock(stateLock);
-    switch(state) {
-    case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_R:
-        state = DELAYED_INACTIVE;
-        break;
-    case DELAYED_RW:
-        state = DELAYED_W;    
-        break;
-    case DELAYED_W:
-    case DELAYED_INACTIVE:
-    case DELAYED_DELETE:
-        break;
-    case ACTIVE_R:
-        assert(poller);
-        poller->modFd(*this, Poller::NONE);
-        state = INACTIVE;
-        break;
-    case ACTIVE_RW:
-        assert(poller);
-        poller->modFd(*this, Poller::OUT);
-        state = ACTIVE_W;
-        break;
-    case ACTIVE_W:
-    case INACTIVE:
-        break;
-    }
-}
-
-void DispatchHandle::unwatchWrite() {
-    if (!writableCallback) {
-        return;
-    }
-    
-    ScopedLock<Mutex> lock(stateLock);
-    switch(state) {
-    case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_W:
-        state = DELAYED_INACTIVE;
-        break;
-    case DELAYED_RW:
-        state = DELAYED_R;
-        break;
-    case DELAYED_R:
-    case DELAYED_INACTIVE:
-    case DELAYED_DELETE:
-        break;
-    case ACTIVE_W:
-        assert(poller);
-        poller->modFd(*this, Poller::NONE);
-        state = INACTIVE;
-        break;
-    case ACTIVE_RW:
-        assert(poller);
-        poller->modFd(*this, Poller::IN);
-        state = ACTIVE_R;
-        break;
-    case ACTIVE_R:
-    case INACTIVE:
-        break;
-   }
-}
-
-void DispatchHandle::unwatch() {
-    ScopedLock<Mutex> lock(stateLock);
-    switch (state) {
-    case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_INACTIVE:
-        state = DELAYED_INACTIVE;
-        break;
-    case DELAYED_DELETE:
-        break;
-    default:
-        assert(poller);
-        poller->modFd(*this, Poller::NONE);
-        state = INACTIVE;
-        break;
-    }            
-}
-
-void DispatchHandle::stopWatch() {
-    ScopedLock<Mutex> lock(stateLock);
-    switch (state) {
-    case IDLE:
-    case DELAYED_IDLE:
-    case DELAYED_DELETE:
-    	return;
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_INACTIVE:
-    	state = DELAYED_IDLE;
-    	break;
-    default:
-	    state = IDLE;
-	    break;
-    }
-    assert(poller);
-    poller->delFd(*this);
-    poller.reset();
-}
-
-// The slightly strange switch structure
-// is to ensure that the lock is released before
-// we do the delete
-void DispatchHandle::doDelete() {
-    // Ensure that we're no longer watching anything
-    stopWatch();
-
-    // If we're in the middle of a callback defer the delete
-    {
-    ScopedLock<Mutex> lock(stateLock);
-    switch (state) {
-    case DELAYED_IDLE:
-    case DELAYED_DELETE:
-        state = DELAYED_DELETE;
-        return;
-    case IDLE:
-    	break;
-    default:
-    	// Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
-        assert(false);
-    }
-    }
-    // If we're not then do it right away
-    delete this;
-}
-
-void DispatchHandle::processEvent(Poller::EventType type) {
-    // Note that we are now doing the callbacks
-    {
-    ScopedLock<Mutex> lock(stateLock);
-    
-    // Set up to wait for same events next time unless reset
-    switch(state) {
-    case ACTIVE_R:
-        state = DELAYED_R;
-        break;
-    case ACTIVE_W:
-        state = DELAYED_W;
-        break;
-    case ACTIVE_RW:
-        state = DELAYED_RW;
-        break;
-    // Can only get here in a DELAYED_* state in the rare case
-    // that we're already here for reading and we get activated for
-    // writing and we can write (it might be possible the other way
-    // round too). In this case we're already processing the handle
-    // in a different thread in this function so return right away
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_INACTIVE:
-    case DELAYED_IDLE:
-    case DELAYED_DELETE:
-        return;
-    default:
-        assert(false);
-    }
-    }
-
-    // Do callbacks - whilst we are doing the callbacks we are prevented from processing
-    // the same handle until we re-enable it. To avoid rentering the callbacks for a single
-    // handle re-enabling in the callbacks is actually deferred until they are complete.
-    switch (type) {
-    case Poller::READABLE:
-        readableCallback(*this);
-        break;
-    case Poller::WRITABLE:
-        writableCallback(*this);
-        break;
-    case Poller::READ_WRITABLE:
-        readableCallback(*this);
-        writableCallback(*this);
-        break;
-    case Poller::DISCONNECTED:
-        {
-        ScopedLock<Mutex> lock(stateLock);
-        state = DELAYED_INACTIVE;
-        }
-        if (disconnectedCallback) {
-            disconnectedCallback(*this);
-        }
-        break;
-    default:
-        assert(false);
-    }
-
-    // If any of the callbacks re-enabled reading/writing then actually
-    // do it now
-    {
-    ScopedLock<Mutex> lock(stateLock);
-    switch (state) {
-    case DELAYED_R:
-        poller->modFd(*this, Poller::IN);
-        state = ACTIVE_R;
-        return;
-    case DELAYED_W:
-        poller->modFd(*this, Poller::OUT);
-        state = ACTIVE_W;
-        return;
-    case DELAYED_RW:
-        poller->modFd(*this, Poller::INOUT);
-        state = ACTIVE_RW;
-        return;
-    case DELAYED_INACTIVE:
-        state = INACTIVE;
-        return;
-    case DELAYED_IDLE:
-    	state = IDLE;
-    	return;
-    default:
-        // This should be impossible
-        assert(false);
-        return;
-    case DELAYED_DELETE:
-        break;
-    }
-    }      
-    delete this;
-}
-
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Tue Oct 21 11:29:44 2008
@@ -24,129 +24,10 @@
 
 #include "Poller.h"
 #include "Runnable.h"
-#include "Mutex.h"
-
-#include <memory>
-#include <queue>
-#include <boost/function.hpp>
-
-#include <assert.h>
-
 
 namespace qpid {
 namespace sys {
 
-class DispatchHandleRef;
-/**
- * In order to have your own handle (file descriptor on Unix) watched by the poller
- * you need to:
- * 
- * - Subclass IOHandle, in the constructor supply an appropriate
- *    IOHandlerPrivate object for the platform.
- *    
- * - Construct a DispatchHandle passing it your IOHandle and 
- *   callback functions for read, write and disconnect events.
- *
- * - Ensure the DispatchHandle is not deleted until the poller is no longer using it.
- *   TODO: astitcher document DispatchHandleRef to simplify this.
- *   
- * When an event occurs on the handle, the poller calls the relevant callback and
- * stops watching that handle. Your callback can call rewatch() or related functions
- * to re-enable polling.
- */
-class DispatchHandle : public PollerHandle {
-    friend class DispatchHandleRef;
-public:
-    typedef boost::function1<void, DispatchHandle&> Callback;
-
-private:
-    Callback readableCallback;
-    Callback writableCallback;
-    Callback disconnectedCallback;
-    Poller::shared_ptr poller;
-    Mutex stateLock;
-    enum {
-        IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
-        DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
-        DELAYED_DELETE
-    } state;
-
-public:
-    /**
-     * Provide a handle to poll and a set of callbacks.  Note
-     * callbacks can be 0, meaning you are not interested in that
-     * event.
-     * 
-     *@param h: the handle to watch. The IOHandle encapsulates a
-     * platfrom-specific handle to an IO object (e.g. a file descriptor
-     * on Unix.)
-     *@param rCb Callback called when the handle is readable.
-     *@param wCb Callback called when the handle is writable.
-     *@param dCb Callback called when the handle is disconnected.
-     */
-    DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
-      PollerHandle(h),
-      readableCallback(rCb),
-      writableCallback(wCb),
-      disconnectedCallback(dCb),
-      state(IDLE)
-    {}
-
-    ~DispatchHandle();
-
-    /** Add this DispatchHandle to the poller to be watched. */
-    void startWatch(Poller::shared_ptr poller);
-
-    /** Resume watchingn for all non-0 callbacks. */
-    void rewatch();
-    /** Resume watchingn for read only. */
-    void rewatchRead();
-
-    /** Resume watchingn for write only. */
-    void rewatchWrite();
-
-    /** Stop watching temporarily. The DispatchHandle remains
-        associated with the poller and can be re-activated using
-        rewatch. */
-    void unwatch();
-    /** Stop watching for read */
-    void unwatchRead();
-    /** Stop watching for write */
-    void unwatchWrite();
-
-    /** Stop watching permanently. Disassociates from the poller. */
-    void stopWatch();
-    
-protected:
-    /** Override to get extra processing done when the DispatchHandle is deleted. */
-    void doDelete();
-
-private:
-    void processEvent(Poller::EventType dir);
-};
-
-class DispatchHandleRef {
-    DispatchHandle* ref;
-
-public:
-    typedef boost::function1<void, DispatchHandle&> Callback;
-    DispatchHandleRef(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
-      ref(new DispatchHandle(h, rCb, wCb, dCb))
-    {}
-
-    ~DispatchHandleRef() { ref->doDelete(); }
-
-    void startWatch(Poller::shared_ptr poller) { ref->startWatch(poller); }
-    void rewatch() { ref->rewatch(); }
-    void rewatchRead() { ref->rewatchRead(); }
-    void rewatchWrite() { ref->rewatchWrite(); }
-    void unwatch() { ref->unwatch(); }
-    void unwatchRead() { ref->unwatchRead(); }
-    void unwatchWrite() { ref->unwatchWrite(); }
-    void stopWatch() { ref->stopWatch(); }
-};
-
-
 class Dispatcher : public Runnable {
     const Poller::shared_ptr poller;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Tue Oct 21 11:29:44 2008
@@ -45,8 +45,8 @@
 
     enum Direction {
         NONE = 0,
-        IN,
-        OUT,
+        INPUT,
+        OUTPUT,
         INOUT
     };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Tue Oct 21 11:29:44 2008
@@ -92,13 +92,14 @@
 
     if (isClient)
         async->setClient();
-    AsynchIO* aio = new AsynchIO(s,
-                                 boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-                                 boost::bind(&AsynchIOHandler::eof, async, _1),
-                                 boost::bind(&AsynchIOHandler::disconnect, async, _1),
-                                 boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-                                 boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-                                 boost::bind(&AsynchIOHandler::idle, async, _1));
+    AsynchIO* aio = AsynchIO::create
+      (s,
+       boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+       boost::bind(&AsynchIOHandler::eof, async, _1),
+       boost::bind(&AsynchIOHandler::disconnect, async, _1),
+       boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+       boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+       boost::bind(&AsynchIOHandler::idle, async, _1));
 
     async->init(aio, 4);
     aio->start(poller);
@@ -133,9 +134,13 @@
     // is no longer needed.
 
     Socket* socket = new Socket();
-    new AsynchConnector (*socket, poller, host, port,
-                         boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true),
-                         failed);
+    AsynchConnector::create (*socket,
+                             poller,
+                             host,
+                             port,
+                             boost::bind(&AsynchIOProtocolFactory::established,
+                                         this, poller, _1, fact, true),
+                             failed);
 }
 
 }} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Oct 21 11:29:44 2008
@@ -162,9 +162,9 @@
 
     static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
         switch (dir) {
-            case Poller::IN: return ::EPOLLIN;
-            case Poller::OUT: return ::EPOLLOUT;
-            case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT;
+            case Poller::INPUT:  return ::EPOLLIN;
+            case Poller::OUTPUT: return ::EPOLLOUT;
+            case Poller::INOUT:  return ::EPOLLIN | ::EPOLLOUT;
             default: return 0;
         }
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Oct 21 11:29:44 2008
@@ -26,8 +26,8 @@
 
 #include "check.h"
 
-// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction
-// could (should) be promoted to be platform portable
+// TODO The basic algorithm here is not really POSIX specific and with a
+// bit more abstraction could (should) be promoted to be platform portable
 #include <unistd.h>
 #include <sys/socket.h>
 #include <signal.h>
@@ -65,24 +65,55 @@
 /*
  * Asynch Acceptor
  */
+namespace qpid {
+namespace sys {
+
+class AsynchAcceptorPrivate {
+public:
+    AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
+    void start(Poller::shared_ptr poller);
+
+private:
+    void readable(DispatchHandle& handle);
+
+private:
+    AsynchAcceptor::Callback acceptedCallback;
+    DispatchHandle handle;
+    const Socket& socket;
+
+};
+
+}} // namespace qpid::sys
 
 AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
+  impl(new AsynchAcceptorPrivate(s, callback))
+{}
+
+AsynchAcceptor::~AsynchAcceptor()
+{ delete impl;}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+    impl->start(poller);
+}
+
+AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s,
+                                             AsynchAcceptor::Callback callback) :
     acceptedCallback(callback),
-    handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+    handle(s, boost::bind(&AsynchAcceptorPrivate::readable, this, _1), 0, 0),
     socket(s) {
 
     s.setNonblocking();
     ignoreSigpipe();
 }
 
-void AsynchAcceptor::start(Poller::shared_ptr poller) {
+void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
     handle.startWatch(poller);
 }
 
 /*
  * We keep on accepting as long as there is something to accept
  */
-void AsynchAcceptor::readable(DispatchHandle& h) {
+void AsynchAcceptorPrivate::readable(DispatchHandle& h) {
     Socket* s;
     do {
         errno = 0;
@@ -106,6 +137,36 @@
 /*
  * Asynch Connector
  */
+namespace qpid {
+namespace sys {
+namespace posix {
+
+/*
+ * POSIX version of AsynchIO TCP socket connector.
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
+ */
+class AsynchConnector : public qpid::sys::AsynchConnector,
+                        private DispatchHandle {
+
+private:
+    void connComplete(DispatchHandle& handle);
+    void failure(int, std::string);
+
+private:
+    ConnectedCallback connCallback;
+    FailedCallback failCallback;
+    const Socket& socket;
+
+public:
+    AsynchConnector(const Socket& socket,
+                    Poller::shared_ptr poller,
+                    std::string hostname,
+                    uint16_t port,
+                    ConnectedCallback connCb,
+                    FailedCallback failCb = 0);
+};
 
 AsynchConnector::AsynchConnector(const Socket& s,
                                  Poller::shared_ptr poller,
@@ -155,9 +216,85 @@
     DispatchHandle::doDelete();
 }
 
+} // namespace posix
+
+
+AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
+                                                    Poller::shared_ptr poller,
+                                                    std::string hostname,
+                                                    uint16_t port,
+                                                    ConnectedCallback connCb,
+                                                    FailedCallback failCb)
+{
+    return new qpid::sys::posix::AsynchConnector(s,
+                                                 poller,
+                                                 hostname,
+                                                 port,
+                                                 connCb,
+                                                 failCb);
+}
+
 /*
- * Asynch reader/writer
+ * POSIX version of AsynchIO reader/writer
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
  */
+namespace posix {
+
+class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle {
+
+public:
+    AsynchIO(const Socket& s,
+             ReadCallback rCb,
+             EofCallback eofCb,
+             DisconnectCallback disCb,
+             ClosedCallback cCb = 0,
+             BuffersEmptyCallback eCb = 0,
+             IdleCallback iCb = 0);
+
+    // Methods inherited from qpid::sys::AsynchIO
+
+    virtual void queueForDeletion();
+
+    virtual void start(Poller::shared_ptr poller);
+    virtual void queueReadBuffer(BufferBase* buff);
+    virtual void unread(BufferBase* buff);
+    virtual void queueWrite(BufferBase* buff);
+    virtual void notifyPendingWrite();
+    virtual void queueWriteClose();
+    virtual bool writeQueueEmpty();
+    virtual BufferBase* getQueuedBuffer();
+
+private:
+    ~AsynchIO();
+
+    // Methods that are callback targets from Dispatcher.
+    void readable(DispatchHandle& handle);
+    void writeable(DispatchHandle& handle);
+    void disconnected(DispatchHandle& handle);
+    void close(DispatchHandle& handle);
+
+private:
+    ReadCallback readCallback;
+    EofCallback eofCallback;
+    DisconnectCallback disCallback;
+    ClosedCallback closedCallback;
+    BuffersEmptyCallback emptyCallback;
+    IdleCallback idleCallback;
+    const Socket& socket;
+    std::deque<BufferBase*> bufferQueue;
+    std::deque<BufferBase*> writeQueue;
+    bool queuedClose;
+    /**
+     * This flag is used to detect and handle concurrency between
+     * calls to notifyPendingWrite() (which can be made from any thread) and
+     * the execution of the writeable() method (which is always on the
+     * thread processing this handle.
+     */
+    volatile bool writePending;
+};
+
 AsynchIO::AsynchIO(const Socket& s,
                    ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
                    ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
@@ -239,6 +376,10 @@
     DispatchHandle::rewatchWrite();
 }
 
+bool AsynchIO::writeQueueEmpty() {
+    return writeQueue.empty();
+}
+
 /** Return a queued buffer if there are enough
  * to spare
  */
@@ -427,3 +568,17 @@
     }
 }
 
+} // namespace posix
+
+AsynchIO* qpid::sys::AsynchIO::create(const Socket& s,
+                                      AsynchIO::ReadCallback rCb,
+                                      AsynchIO::EofCallback eofCb,
+                                      AsynchIO::DisconnectCallback disCb,
+                                      AsynchIO::ClosedCallback cCb,
+                                      AsynchIO::BuffersEmptyCallback eCb,
+                                      AsynchIO::IdleCallback iCb)
+{
+    return new qpid::sys::posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
+}
+
+}} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp Tue Oct 21 11:29:44 2008
@@ -129,9 +129,9 @@
 
     static uint32_t directionToPollEvent(Poller::Direction dir) {
         switch (dir) {
-            case Poller::IN: return POLLIN;
-            case Poller::OUT: return POLLOUT;
-            case Poller::INOUT: return POLLIN | POLLOUT;
+            case Poller::INPUT:  return POLLIN;
+            case Poller::OUTPUT: return POLLOUT;
+            case Poller::INOUT:  return POLLIN | POLLOUT;
             default: return 0;
         }
     }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Tue Oct 21 11:29:44 2008
@@ -0,0 +1,729 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIoResult.h"
+#include "IoHandlePrivate.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
+
+#include "check.h"
+
+#include <boost/thread/once.hpp>
+
+#include <winsock2.h>
+#include <mswsock.h>
+#include <windows.h>
+
+#include <boost/bind.hpp>
+
+namespace {
+
+    typedef qpid::sys::ScopedLock<qpid::sys::Mutex>  QLock;
+
+/*
+ * We keep per thread state to avoid locking overhead. The assumption is that
+ * on average all the connections are serviced by all the threads so the state
+ * recorded in each thread is about the same. If this turns out not to be the
+ * case we could rebalance the info occasionally.  
+ */
+QPID_TSS int threadReadTotal = 0;
+QPID_TSS int threadMaxRead = 0;
+QPID_TSS int threadReadCount = 0;
+QPID_TSS int threadWriteTotal = 0;
+QPID_TSS int threadWriteCount = 0;
+QPID_TSS int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
+
+/*
+ * The function pointers for AcceptEx and ConnectEx need to be looked up
+ * at run time. Make sure this is done only once.
+ */
+boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
+LPFN_ACCEPTEX fnAcceptEx = 0;
+typedef void (*lookUpFunc)(const qpid::sys::Socket &);
+
+void lookUpAcceptEx() {
+    SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+    GUID guidAcceptEx = WSAID_ACCEPTEX;
+    DWORD dwBytes = 0;
+    WSAIoctl(h,
+             SIO_GET_EXTENSION_FUNCTION_POINTER,
+             &guidAcceptEx,
+             sizeof(guidAcceptEx),
+             &fnAcceptEx,
+             sizeof(fnAcceptEx),
+             &dwBytes,
+             NULL,
+             NULL);
+    closesocket(h);
+    if (fnAcceptEx == 0)
+        throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+}
+
+}
+
+namespace qpid {
+namespace sys {
+
+/*
+ * Asynch Acceptor
+ *
+ * This implementation uses knowledge that the DispatchHandle handle member
+ * is derived from PollerHandle, which has a reference to the Socket.
+ * No dispatching features of DispatchHandle are used - we just use the
+ * conduit to the Socket.
+ *
+ * AsynchAcceptor uses an AsynchAcceptResult object to track completion
+ * and status of each accept operation outstanding.
+ */
+
+class AsynchAcceptorPrivate {
+
+    friend class AsynchAcceptResult;
+
+public:
+    AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
+    ~AsynchAcceptorPrivate();
+    void start(Poller::shared_ptr poller);
+
+private:
+    void restart(void);
+
+    AsynchAcceptor::Callback acceptedCallback;
+    const Socket& socket;
+};
+
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
+  impl(new AsynchAcceptorPrivate(s, callback))
+{}
+
+AsynchAcceptor::~AsynchAcceptor()
+{ delete impl; }
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+    impl->start(poller);
+}
+
+AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s,
+                                             AsynchAcceptor::Callback callback)
+  : acceptedCallback(callback),
+    socket(s) {
+
+    s.setNonblocking();
+#if (BOOST_VERSION >= 103500)   /* boost 1.35 or later reversed the args */
+    boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
+#else
+    boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
+#endif
+}
+
+AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) {
+    socket.close();
+}
+
+void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
+    poller->addFd(PollerHandle(socket), Poller::INPUT);
+    restart ();
+}
+
+void AsynchAcceptor::restart(void) {
+    DWORD bytesReceived = 0;  // Not used, needed for AcceptEx API
+    AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
+                                                        this,
+                                                        toFd(socket.impl));
+    BOOL status;
+    status = ::fnAcceptEx(toFd(socket.impl),
+                          toFd(result->newSocket->impl),
+                          result->addressBuffer,
+                          0,
+                          AsynchAcceptResult::SOCKADDRMAXLEN,
+                          AsynchAcceptResult::SOCKADDRMAXLEN,
+                          &bytesReceived,
+                          result->overlapped());
+    QPID_WINDOWS_CHECK_ASYNC_START(status);
+}
+
+
+AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
+                                       AsynchAcceptor *acceptor,
+                                       SOCKET listener)
+  : callback(cb), acceptor(acceptor), listener(listener) {
+    newSocket.reset (new Socket());
+}
+
+void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
+    ::setsockopt (toFd(newSocket->impl),
+                  SOL_SOCKET,
+                  SO_UPDATE_ACCEPT_CONTEXT,
+                  (char*)&listener,
+                  sizeof (listener));
+    callback(*(newSocket.release()));
+    acceptor->restart ();
+    delete this;
+}
+
+void AsynchAcceptResult::failure(int status) {
+  if (status != WSA_OPERATION_ABORTED)
+    ;
+  delete this;
+}
+
+namespace windows {
+
+/*
+ * AsynchConnector does synchronous connects for now... to do asynch the
+ * IocpPoller will need some extension to register an event handle as a
+ * CONNECT-type "direction", the connect completion/result will need an
+ * event handle to associate with the connecting handle. But there's no
+ * time for that right now...
+ */
+class AsynchConnector : public qpid::sys::AsynchConnector {
+private:
+    ConnectedCallback connCallback;
+    FailedCallback failCallback;
+    const Socket& socket;
+
+public:
+    AsynchConnector(const Socket& socket,
+                    Poller::shared_ptr poller,
+                    std::string hostname,
+                    uint16_t port,
+                    ConnectedCallback connCb,
+                    FailedCallback failCb = 0);
+};
+
+AsynchConnector::AsynchConnector(const Socket& sock,
+                                 Poller::shared_ptr poller,
+                                 std::string hostname,
+                                 uint16_t port,
+                                 ConnectedCallback connCb,
+                                 FailedCallback failCb)
+  : connCallback(connCb), failCallback(failCb), socket(sock) {
+    socket.setNonblocking();
+    try {
+        socket.connect(hostname, port);
+        connCallback(socket);
+    } catch(std::exception& e) {
+        if (failCallback)
+            failCallback(-1, std::string(e.what()));
+        socket.close();
+        delete &socket;
+    }
+}
+
+} // namespace windows
+
+AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
+                                                    Poller::shared_ptr poller,
+                                                    std::string hostname,
+                                                    uint16_t port,
+                                                    ConnectedCallback connCb,
+                                                    FailedCallback failCb)
+{
+    return new qpid::sys::windows::AsynchConnector(s,
+                                                   poller,
+                                                   hostname,
+                                                   port,
+                                                   connCb,
+                                                   failCb);
+}
+
+
+/*
+ * Asynch reader/writer
+ */
+
+namespace windows {
+
+class AsynchIO : public qpid::sys::AsynchIO {
+public:
+    AsynchIO(const Socket& s,
+             ReadCallback rCb,
+             EofCallback eofCb,
+             DisconnectCallback disCb,
+             ClosedCallback cCb = 0,
+             BuffersEmptyCallback eCb = 0,
+             IdleCallback iCb = 0);
+    ~AsynchIO();
+
+    // Methods inherited from qpid::sys::AsynchIO
+
+    /**
+     * Notify the object is should delete itself as soon as possible.
+     */
+    virtual void queueForDeletion();
+
+    /// Take any actions needed to prepare for working with the poller.
+    virtual void start(Poller::shared_ptr poller);
+    virtual void queueReadBuffer(BufferBase* buff);
+    virtual void unread(BufferBase* buff);
+    virtual void queueWrite(BufferBase* buff);
+    virtual void notifyPendingWrite();
+    virtual void queueWriteClose();
+    virtual bool writeQueueEmpty();
+
+    /**
+     * getQueuedBuffer returns a buffer from the buffer queue, if one is
+     * available.
+     *
+     * @retval Pointer to BufferBase buffer; 0 if none is available.
+     */
+    virtual BufferBase* getQueuedBuffer();
+
+private:
+    ReadCallback readCallback;
+    EofCallback eofCallback;
+    DisconnectCallback disCallback;
+    ClosedCallback closedCallback;
+    BuffersEmptyCallback emptyCallback;
+    IdleCallback idleCallback;
+    const Socket& socket;
+    Poller::shared_ptr poller;
+
+    std::deque<BufferBase*> bufferQueue;
+    std::deque<BufferBase*> writeQueue;
+    /* The MSVC-supplied deque is not thread-safe; keep locks to serialize
+     * access to the buffer queue and write queue.
+     */
+    Mutex bufferQueueLock;
+
+    // Number of outstanding I/O operations.
+    volatile LONG opsInProgress;
+    // Is there a write in progress?
+    volatile bool writeInProgress;
+    // Deletion requested, but there are callbacks in progress.
+    volatile bool queuedDelete;
+    // Socket close requested, but there are operations in progress.
+    volatile bool queuedClose;
+
+private:
+    void close(void);
+
+    /**
+     * Initiate a read operation. AsynchIO::dispatchReadComplete() will be
+     * called when the read is complete and data is available.
+     */
+    virtual void startRead(void);
+
+    /**
+     * Initiate a write of the specified buffer. There's no callback for
+     * write completion to the AsynchIO object.
+     */
+    virtual void startWrite(AsynchIO::BufferBase* buff);
+
+    virtual bool writesNotComplete();
+
+    /**
+     * readComplete is called when a read request is complete.
+     *
+     * @param result Results of the operation.
+     */
+    void readComplete(AsynchReadResult *result);
+
+    /**
+     * writeComplete is called when a write request is complete.
+     *
+     * @param result Results of the operation.
+     */
+    void writeComplete(AsynchWriteResult *result);
+
+    /**
+     * Queue of completions to run. This queue enforces the requirement
+     * from upper layers that only one thread at a time is allowed to act
+     * on any given connection. Once a thread is busy processing a completion
+     * on this object, other threads that dispatch completions queue the
+     * completions here for the in-progress thread to handle when done.
+     * Thus, any threads can dispatch a completion from the IocpPoller, but
+     * this class ensures that actual processing at the connection level is
+     * only on one thread at a time.
+     */
+    std::queue<AsynchIoResult *> completionQueue;
+    volatile bool working;
+    Mutex completionLock;
+
+    /**
+     * Called when there's a completion to process.
+     */
+    void completion(AsynchIoResult *result);
+};
+
+AsynchIO::AsynchIO(const Socket& s,
+                   ReadCallback rCb,
+                   EofCallback eofCb,
+                   DisconnectCallback disCb,
+                   ClosedCallback cCb,
+                   BuffersEmptyCallback eCb,
+                   IdleCallback iCb) :
+
+    readCallback(rCb),
+    eofCallback(eofCb),
+    disCallback(disCb),
+    closedCallback(cCb),
+    emptyCallback(eCb),
+    idleCallback(iCb),
+    socket(s),
+    opsInProgress(0),
+    writeInProgress(false),
+    queuedDelete(false),
+    queuedClose(false),
+    working(false) {
+}
+
+struct deleter
+{
+    template <typename T>
+    void operator()(T *ptr){ delete ptr;}
+};
+
+AsynchIO::~AsynchIO() {
+    std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
+    std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
+}
+
+void AsynchIO::queueForDeletion() {
+    queuedDelete = true;
+    if (opsInProgress > 0) {
+        QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+        // AsynchIOHandler calls this then deletes itself; don't do any more
+        // callbacks.
+        readCallback = 0;
+        eofCallback = 0;
+        disCallback = 0;
+        closedCallback = 0;
+        emptyCallback = 0;
+        idleCallback = 0;
+    }
+    else {
+        delete this;
+    }
+}
+
+void AsynchIO::start(Poller::shared_ptr poller0) {
+    poller = poller0;
+    poller->addFd(PollerHandle(socket), Poller::INPUT);
+    if (writeQueue.size() > 0)  // Already have data queued for write
+        notifyPendingWrite();
+    startRead();
+}
+
+void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
+    assert(buff);
+    buff->dataStart = 0;
+    buff->dataCount = 0;
+    QLock l(bufferQueueLock);
+    bufferQueue.push_back(buff);
+}
+
+void AsynchIO::unread(AsynchIO::BufferBase* buff) {
+    assert(buff);
+    if (buff->dataStart != 0) {
+        memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+        buff->dataStart = 0;
+    }
+    QLock l(bufferQueueLock);
+    bufferQueue.push_front(buff);
+}
+
+void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
+    assert(buff);
+    QLock l(bufferQueueLock);
+    writeQueue.push_back(buff);
+    if (!writeInProgress)
+        notifyPendingWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+    // This method is generally called from a processing thread; transfer
+    // work on this to an I/O thread. Much of the upper layer code assumes
+    // that all I/O-related things happen in an I/O thread.
+    if (poller == 0)    // Not really going yet...
+        return;
+
+    InterlockedIncrement(&opsInProgress);
+    IOHandlePrivate *hp =
+        new IOHandlePrivate (INVALID_SOCKET,
+                             boost::bind(&AsynchIO::completion, this, _1));
+    IOHandle h(hp);
+    PollerHandle ph(h);
+    poller->addFd(ph, Poller::OUTPUT);
+}
+
+void AsynchIO::queueWriteClose() {
+    queuedClose = true;
+    if (!writeInProgress)
+        notifyPendingWrite();
+}
+
+bool AsynchIO::writeQueueEmpty() {
+    QLock l(bufferQueueLock);
+    return writeQueue.size() == 0;
+}
+
+/**
+ * Return a queued buffer if there are enough to spare.
+ */
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
+    QLock l(bufferQueueLock);
+    // Always keep at least one buffer (it might have data that was
+    // "unread" in it).
+    if (bufferQueue.size() <= 1)
+        return 0;
+    BufferBase* buff = bufferQueue.back();
+    assert(buff);
+    bufferQueue.pop_back();
+    return buff;
+}
+
+void AsynchIO::dispatchReadComplete(AsynchIO::BufferBase *buffer) {
+    if (readCallback)
+        readCallback(*this, buffer);
+}
+
+void AsynchIO::notifyEof(void) {
+    if (eofCallback)
+        eofCallback(*this);
+}
+
+void AsynchIO::notifyDisconnect(void) {
+    if (disCallback)
+        disCallback(*this);
+}
+
+void AsynchIO::notifyClosed(void) {
+    if (closedCallback)
+        closedCallback(*this, socket);
+}
+
+void AsynchIO::notifyBuffersEmpty(void) {
+    if (emptyCallback)
+        emptyCallback(*this);
+}
+
+void AsynchIO::notifyIdle(void) {
+    if (idleCallback)
+        idleCallback(*this);
+}
+
+/*
+ * Asynch reader/writer using overlapped I/O
+ */
+
+void AsynchIO::startRead(void) {
+    if (queuedDelete)
+        return;
+
+    // (Try to) get a buffer; look on the front since there may be an
+    // "unread" one there with data remaining from last time.
+    AsynchIO::BufferBase *buff = 0;
+    {
+        QLock l(bufferQueueLock);
+
+        if (!bufferQueue.empty()) {
+            buff = bufferQueue.front();
+            assert(buff);
+            bufferQueue.pop_front();
+        }
+    }
+    if (buff != 0) {
+        int readCount = buff->byteCount - buff->dataCount;
+        AsynchReadResult *result =
+            new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1),
+                                 buff,
+                                 readCount);
+        DWORD bytesReceived = 0, flags = 0;
+        InterlockedIncrement(&opsInProgress);
+        int status = WSARecv(toFd(socket.impl),
+                             const_cast<LPWSABUF>(result->getWSABUF()), 1,
+                             &bytesReceived,
+                             &flags,
+                             result->overlapped(),
+                             0);
+        if (status != 0) {
+            int error = WSAGetLastError();
+            if (error != WSA_IO_PENDING) {
+                result->failure(error);
+                result = 0;   // result is invalid here
+                return;
+            }
+        }
+        // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+    }
+    else {
+        notifyBuffersEmpty();
+    }
+    return;
+}
+
+void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
+    writeInProgress = true;
+    InterlockedIncrement(&opsInProgress);
+    int writeCount = buff->byteCount-buff->dataCount;
+    AsynchWriteResult *result =
+        new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
+                              buff,
+                              buff->dataCount);
+    DWORD bytesSent = 0;
+    int status = WSASend(toFd(socket.impl),
+                         const_cast<LPWSABUF>(result->getWSABUF()), 1,
+                         &bytesSent,
+                         0,
+                         result->overlapped(),
+                         0);
+    if (status != 0) {
+        int error = WSAGetLastError();
+        if (error != WSA_IO_PENDING) {
+            result->failure(error);   // Also decrements in-progress count
+            result = 0;   // result is invalid here
+            return;
+        }
+    }
+    // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+    return;
+}
+
+bool AsynchIO::writesNotComplete() {
+    return writeInProgress;
+}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(void) {
+    socket.close();
+    notifyClosed();
+}
+
+void AsynchIO::readComplete(AsynchReadResult *result) {
+    ++threadReadCount;
+    int status = result->getStatus();
+    size_t bytes = result->getTransferred();
+    if (status == 0 && bytes > 0) {
+        threadReadTotal += bytes;
+        dispatchReadComplete(result->getBuff());
+        startRead();
+    }
+    else {
+        // No data read, so put the buffer back. It may be partially filled,
+        // so "unread" it back to the front of the queue.
+        unread(result->getBuff());
+        if (status == 0)
+            notifyEof();
+        else
+            notifyDisconnect();
+    }
+}
+
+/*
+ * NOTE - this completion is called for completed writes and also when 
+ * a write is desired. The difference is in the buff - if a write is desired
+ * the buff is 0.
+ */
+void AsynchIO::writeComplete(AsynchWriteResult *result) {
+    int status = result->getStatus();
+    size_t bytes = result->getTransferred();
+    AsynchIO::BufferBase *buff = result->getBuff();
+    if (buff != 0) {
+        ++threadWriteCount;
+        writeInProgress = false;
+        if (status == 0 && bytes > 0) {
+            threadWriteTotal += bytes;
+            if (bytes < result->getRequested()) // Still more to go; resubmit
+                startWrite(buff);
+            else
+                queueReadBuffer(buff);     // All done; back to the pool
+        }
+        else {
+            // An error... if it's a connection close, ignore it - it will be
+            // noticed and handled on a read completion any moment now.
+            // What to do with real error??? Save the Buffer?
+        }
+    }
+
+    // If there are no writes outstanding, the priority is to write any
+    // remaining buffers first (either queued or via idle), then close the
+    // socket if that's queued.
+    // opsInProgress handled in completion()
+    if (!writeInProgress) {
+        bool writing = false;
+        {
+            QLock l(bufferQueueLock);
+            if (writeQueue.size() > 0) {
+                buff = writeQueue.front();
+                assert(buff);
+                writeQueue.pop_front();
+                startWrite(buff);
+                writing = true;
+            }
+        }
+        if (!writing) {
+            if (queuedClose)
+                close();
+            else
+                notifyIdle();
+        }
+    }
+    return;
+}
+
+void AsynchIO::completion(AsynchIoResult *result) {
+    {
+        ScopedLock<Mutex> l(completionLock);
+        if (working) {
+            completionQueue.push(result);
+            return;
+        }
+
+        // First thread in with something to do; note we're working then keep
+        // handling completions.
+        working = true;
+        while (result != 0) {
+            // New scope to unlock temporarily.
+            {
+                ScopedUnlock<Mutex> ul(completionLock);
+                AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
+                if (r != 0)
+                    readComplete(r);
+                else {
+                    AsynchWriteResult *w =
+                        dynamic_cast<AsynchWriteResult*>(result);
+                    writeComplete(w);
+                }
+                delete result;
+                result = 0;
+                InterlockedDecrement(&opsInProgress);
+            }
+            // Lock is held again.
+            if (completionQueue.empty())
+                continue;
+            result = completionQueue.front();
+            completionQueue.pop();
+        }
+        working = false;
+    }
+    // Lock released; ok to delete if all is done.
+    if (opsInProgress == 0 && queuedDelete)
+        delete this;
+}
+
+}}  // namespace qpid::windows

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h Tue Oct 21 11:29:44 2008
@@ -0,0 +1,187 @@
+#ifndef _windows_asynchIoResult_h
+#define _windows_asynchIoResult_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include <memory.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
+namespace qpid {
+namespace sys {
+
+/*
+ * AsynchIoResult defines the class that receives the result of an
+ * asynchronous I/O operation, either send/recv or accept/connect.
+ *
+ * Operation factories should set one of these up before beginning the
+ * operation. Poller knows how to dispatch completion to this class.
+ * This class must be subclassed for needed operations; this class provides
+ * an interface only and cannot be instantiated.
+ *
+ * This class is tied to Windows; it inherits from OVERLAPPED so that the
+ * IocpPoller can cast OVERLAPPED pointers back to AsynchIoResult and call
+ * the completion handler.
+ */
+class AsynchResult : private OVERLAPPED {
+public:
+    LPOVERLAPPED overlapped(void) { return this; }
+    static AsynchResult* from_overlapped(LPOVERLAPPED ol) {
+        return static_cast<AsynchResult*>(ol);
+    }
+    virtual void success (size_t bytesTransferred) {
+        bytes = bytesTransferred;
+        status = 0;
+        complete();
+    }
+    virtual void failure (int error) {
+        bytes = 0;
+        status = error;
+        complete();
+    }
+    size_t getTransferred(void) const { return bytes; }
+    int getStatus(void) const { return status; }
+
+protected:
+    AsynchResult() : bytes(0), status(0)
+      { memset(overlapped(), 0, sizeof(OVERLAPPED)); }
+    ~AsynchResult() {}
+    virtual void complete(void) = 0;
+
+    size_t bytes;
+    int status;
+};
+
+class AsynchAcceptorPrivate;
+class AsynchAcceptResult : public AsynchResult {
+
+    friend class AsynchAcceptorPrivate;
+
+public:
+    AsynchAcceptResult(AsynchAcceptor::Callback cb,
+                       AsynchAcceptorPrivate *acceptor,
+                       SOCKET listener);
+    virtual void success (size_t bytesTransferred);
+    virtual void failure (int error);
+
+private:
+    virtual void complete(void) {}  // No-op for this class.
+
+    std::auto_ptr<qpid::sys::Socket> newSocket;
+    AsynchAcceptor::Callback callback;
+    AsynchAcceptorPrivate *acceptor;
+    SOCKET listener;
+
+    // AcceptEx needs a place to write the local and remote addresses
+    // when accepting the connection. Place those here; get enough for
+    // IPv6 addresses, even if the socket is IPv4.
+    enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16,
+           SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
+    char addressBuffer[SOCKADDRBUFLEN];
+};
+
+class AsynchIO;
+
+class AsynchIoResult : public AsynchResult {
+public:
+    typedef boost::function1<void, AsynchIoResult *> Completer;
+
+    virtual ~AsynchIoResult() {}
+    AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
+    size_t getRequested(void) const { return requested; }
+    const WSABUF *getWSABUF(void) const { return &wsabuf; }
+
+protected:
+    void setBuff (AsynchIO::BufferBase *buffer) { iobuff = buffer; }
+
+protected:
+    AsynchIoResult(Completer cb,
+                   AsynchIO::BufferBase *buff, size_t length)
+      : completionCallback(cb), iobuff(buff), requested(length) {}
+
+    virtual void complete(void) = 0;
+    WSABUF wsabuf;
+    Completer completionCallback;
+
+private:
+    AsynchIO::BufferBase *iobuff;
+    size_t  requested;     // Number of bytes in original I/O request
+};
+
+class AsynchReadResult : public AsynchIoResult {
+
+    // complete() updates buffer then does completion callback.
+    virtual void complete(void) {
+        getBuff()->dataCount += bytes;
+        completionCallback(this);
+    }
+
+public:
+    AsynchReadResult(AsynchIoResult::Completer cb,
+                     AsynchIO::BufferBase *buff,
+                     size_t length)
+      : AsynchIoResult(cb, buff, length) {
+        wsabuf.buf = buff->bytes + buff->dataCount;
+        wsabuf.len = length;
+    }
+};
+
+class AsynchWriteResult : public AsynchIoResult {
+
+    // complete() updates buffer then does completion callback.
+    virtual void complete(void) {
+        AsynchIO::BufferBase *b = getBuff();
+        b->dataStart += bytes;
+        b->dataCount -= bytes;
+        completionCallback(this);
+    }
+
+public:
+    AsynchWriteResult(AsynchIoResult::Completer cb,
+                      AsynchIO::BufferBase *buff,
+                      size_t length)
+      : AsynchIoResult(cb, buff, length) {
+        wsabuf.buf = buff ? buff->bytes : 0;
+        wsabuf.len = length;
+    }
+};
+
+class AsynchWriteWanted : public AsynchWriteResult {
+
+    // complete() just does completion callback; no buffers used.
+    virtual void complete(void) {
+        completionCallback(this);
+    }
+
+public:
+    AsynchWriteWanted(AsynchIoResult::Completer cb)
+      : AsynchWriteResult(cb, 0, 0) {
+        wsabuf.buf = 0;
+        wsabuf.len = 0;
+    }
+};
+
+}}
+
+#endif  /*!_windows_asynchIoResult_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp Tue Oct 21 11:29:44 2008
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+#include "IoHandlePrivate.h"
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+SOCKET toFd(const IOHandlePrivate* h)
+{
+    return h->fd;
+}
+
+IOHandle::IOHandle(IOHandlePrivate* h) :
+  impl(h)
+{}
+
+IOHandle::~IOHandle() {
+	delete impl;
+}
+
+}} // namespace qpid::sys

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h Tue Oct 21 11:29:44 2008
@@ -0,0 +1,52 @@
+#ifndef _sys_windows_IoHandlePrivate_h
+#define _sys_windows_IoHandlePrivate_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIoResult.h"
+
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+// Private fd related implementation details
+// There should be either a valid socket handle or a completer callback.
+// Handle is used to associate with poller's iocp; completer is used to
+// inject a completion that will very quickly trigger a callback to the
+// completer from an I/O thread.
+class IOHandlePrivate {
+public:
+    IOHandlePrivate(SOCKET f = INVALID_SOCKET,
+                    AsynchIoResult::Completer cb = 0) :
+    fd(f), event(cb)
+    {}
+    
+    SOCKET fd;
+    AsynchIoResult::Completer event;
+};
+
+SOCKET toFd(const IOHandlePrivate* h);
+
+}}
+
+#endif /* _sys_windows_IoHandlePrivate_h */

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp Tue Oct 21 11:29:44 2008
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Dispatcher.h"
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+Dispatcher::Dispatcher(Poller::shared_ptr poller0) :
+  poller(poller0) {
+}
+
+Dispatcher::~Dispatcher() {
+}
+    
+void Dispatcher::run() {
+    do {
+        Poller::Event event = poller->wait();
+
+        // Handle shutdown
+        switch (event.type) {
+        case Poller::SHUTDOWN:
+            return;
+            break;
+        case Poller::INVALID:  // On any type of success or fail completion
+            break;
+        default:
+          // This should be impossible
+          assert(false);
+        }
+    } while (true);
+}
+
+}}