You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2008/10/21 20:29:51 UTC
svn commit: r706709 [1/2] - in /incubator/qpid/trunk/qpid/cpp/src: ./
qpid/client/ qpid/log/ qpid/sys/ qpid/sys/epoll/ qpid/sys/posix/
qpid/sys/solaris/ qpid/sys/windows/ tests/
Author: shuston
Date: Tue Oct 21 11:29:44 2008
New Revision: 706709
URL: http://svn.apache.org/viewvc?rev=706709&view=rev
Log:
Refactor sys::AsynchIO class to allow reimplementing on other platforms without affecting upper level usage. Resolves QPID-1377 and supplies Windows AsynchIO.cpp
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Oct 21 11:29:44 2008
@@ -283,6 +283,7 @@
qpid/sys/AggregateOutput.cpp \
qpid/sys/AsynchIOHandler.cpp \
qpid/sys/Dispatcher.cpp \
+ qpid/sys/DispatchHandle.cpp \
qpid/sys/PollableCondition.h \
qpid/sys/PollableQueue.h \
qpid/sys/Runnable.cpp \
@@ -609,6 +610,7 @@
qpid/sys/ConnectionOutputHandlerPtr.h \
qpid/sys/DeletionManager.h \
qpid/sys/Dispatcher.h \
+ qpid/sys/DispatchHandle.h \
qpid/sys/FileSysDir.h \
qpid/sys/IntegerTypes.h \
qpid/sys/IOHandle.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Tue Oct 21 11:29:44 2008
@@ -22,7 +22,6 @@
#include "qpid/log/Logger.h"
#include "qpid/sys/Socket.h"
-#include <sys/socket.h>
namespace qpid {
namespace client {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Oct 21 11:29:44 2008
@@ -200,7 +200,7 @@
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
closed = false;
poller = Poller::shared_ptr(new Poller);
- aio = new AsynchIO(socket,
+ aio = AsynchIO::create(socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
boost::bind(&TCPConnector::eof, this, _1),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h Tue Oct 21 11:29:44 2008
@@ -24,7 +24,7 @@
namespace qpid {
namespace log {
-class Options;
+struct Options;
/**
* A selector identifies the set of log messages to enable.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Tue Oct 21 11:29:44 2008
@@ -21,7 +21,8 @@
*
*/
-#include "Dispatcher.h"
+// @@TODO: TAKE THIS OUT... SHould be in posix version.
+#include "DispatchHandle.h"
#include <boost/function.hpp>
#include <deque>
@@ -35,48 +36,45 @@
* Asynchronous acceptor: accepts connections then does a callback with the
* accepted fd
*/
+class AsynchAcceptorPrivate;
class AsynchAcceptor {
public:
typedef boost::function1<void, const Socket&> Callback;
private:
- Callback acceptedCallback;
- DispatchHandle handle;
- const Socket& socket;
+ AsynchAcceptorPrivate* impl;
public:
AsynchAcceptor(const Socket& s, Callback callback);
+ ~AsynchAcceptor();
void start(Poller::shared_ptr poller);
-
-private:
- void readable(DispatchHandle& handle);
};
/*
* Asynchronous connector: starts the process of initiating a connection and
* invokes a callback when completed or failed.
*/
-class AsynchConnector : private DispatchHandle {
+class AsynchConnector {
public:
typedef boost::function1<void, const Socket&> ConnectedCallback;
typedef boost::function2<void, int, std::string> FailedCallback;
-private:
- ConnectedCallback connCallback;
- FailedCallback failCallback;
- const Socket& socket;
-
-public:
- AsynchConnector(const Socket& socket,
- Poller::shared_ptr poller,
- std::string hostname,
- uint16_t port,
- ConnectedCallback connCb,
- FailedCallback failCb = 0);
-
-private:
- void connComplete(DispatchHandle& handle);
- void failure(int, std::string);
+ // Call create() to allocate a new AsynchConnector object with the
+ // specified poller, addressing, and callbacks.
+ // This method is implemented in platform-specific code to
+ // create a correctly typed object. The platform code also manages
+ // deletes. To correctly manage heaps when needed, the allocate and
+ // delete should both be done from the same class/library.
+ static AsynchConnector* create(const Socket& s,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+
+protected:
+ AsynchConnector() {}
+ virtual ~AsynchConnector() {}
};
struct AsynchIOBufferBase {
@@ -99,16 +97,14 @@
/*
* Asychronous reader/writer:
* Reader accepts buffers to read into; reads into the provided buffers
- * and then does a callback with the buffer and amount read. Optionally it can callback
- * when there is something to read but no buffer to read it into.
+ * and then does a callback with the buffer and amount read. Optionally it
+ * can callback when there is something to read but no buffer to read it into.
*
* Writer accepts a buffer and queues it for writing; can also be given
- * a callback for when writing is "idle" (ie fd is writable, but nothing to write)
- *
- * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting
- * the contained DispatchHandle
+ * a callback for when writing is "idle" (ie fd is writable, but nothing
+ * to write).
*/
-class AsynchIO : private DispatchHandle {
+class AsynchIO {
public:
typedef AsynchIOBufferBase BufferBase;
@@ -119,46 +115,35 @@
typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
typedef boost::function1<void, AsynchIO&> IdleCallback;
-private:
- ReadCallback readCallback;
- EofCallback eofCallback;
- DisconnectCallback disCallback;
- ClosedCallback closedCallback;
- BuffersEmptyCallback emptyCallback;
- IdleCallback idleCallback;
- const Socket& socket;
- std::deque<BufferBase*> bufferQueue;
- std::deque<BufferBase*> writeQueue;
- bool queuedClose;
- /**
- * This flag is used to detect and handle concurrency between
- * calls to notifyPendingWrite() (which can be made from any thread) and
- * the execution of the writeable() method (which is always on the
- * thread processing this handle.
- */
- volatile bool writePending;
-
-public:
- AsynchIO(const Socket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
- void queueForDeletion();
-
- void start(Poller::shared_ptr poller);
- void queueReadBuffer(BufferBase* buff);
- void unread(BufferBase* buff);
- void queueWrite(BufferBase* buff);
- void notifyPendingWrite();
- void queueWriteClose();
- bool writeQueueEmpty() { return writeQueue.empty(); }
- BufferBase* getQueuedBuffer();
-
-private:
- ~AsynchIO();
- void readable(DispatchHandle& handle);
- void writeable(DispatchHandle& handle);
- void disconnected(DispatchHandle& handle);
- void close(DispatchHandle& handle);
+ // Call create() to allocate a new AsynchIO object with the specified
+ // callbacks. This method is implemented in platform-specific code to
+ // create a correctly typed object. The platform code also manages
+ // deletes. To correctly manage heaps when needed, the allocate and
+ // delete should both be done from the same class/library.
+ static AsynchIO* create(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+public:
+ virtual void queueForDeletion() = 0;
+
+ virtual void start(Poller::shared_ptr poller) = 0;
+ virtual void queueReadBuffer(BufferBase* buff) = 0;
+ virtual void unread(BufferBase* buff) = 0;
+ virtual void queueWrite(BufferBase* buff) = 0;
+ virtual void notifyPendingWrite() = 0;
+ virtual void queueWriteClose() = 0;
+ virtual bool writeQueueEmpty() = 0;
+ virtual BufferBase* getQueuedBuffer() = 0;
+
+protected:
+ // Derived class manages lifetime; must be constructed using the
+ // static create() method. Deletes not allowed from outside.
+ AsynchIO() {}
+ virtual ~AsynchIO() {}
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Tue Oct 21 11:29:44 2008
@@ -33,7 +33,7 @@
namespace sys {
class AsynchIO;
-class AsynchIOBufferBase;
+struct AsynchIOBufferBase;
class Socket;
class AsynchIOHandler : public OutputControl {
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Tue Oct 21 11:29:44 2008
@@ -0,0 +1,409 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "DispatchHandle.h"
+
+#include <boost/cast.hpp>
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+DispatchHandle::~DispatchHandle() {
+ stopWatch();
+}
+
+void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
+ bool r = readableCallback;
+ bool w = writableCallback;
+
+ ScopedLock<Mutex> lock(stateLock);
+ assert(state == IDLE);
+
+ // If no callbacks set then do nothing (that is what we were asked to do!)
+ // TODO: Maybe this should be an assert instead
+ if (!r && !w) {
+ state = INACTIVE;
+ return;
+ }
+
+ Poller::Direction d = r ?
+ (w ? Poller::INOUT : Poller::INPUT) :
+ Poller::OUTPUT;
+
+ poller = poller0;
+ poller->addFd(*this, d);
+
+ state = r ?
+ (w ? ACTIVE_RW : ACTIVE_R) :
+ ACTIVE_W;
+}
+
+void DispatchHandle::rewatch() {
+ bool r = readableCallback;
+ bool w = writableCallback;
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ break;
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_INACTIVE:
+ state = r ?
+ (w ? DELAYED_RW : DELAYED_R) :
+ DELAYED_W;
+ break;
+ case DELAYED_DELETE:
+ break;
+ case INACTIVE:
+ case ACTIVE_R:
+ case ACTIVE_W: {
+ assert(poller);
+ Poller::Direction d = r ?
+ (w ? Poller::INOUT : Poller::INPUT) :
+ Poller::OUTPUT;
+ poller->modFd(*this, d);
+ state = r ?
+ (w ? ACTIVE_RW : ACTIVE_R) :
+ ACTIVE_W;
+ break;
+ }
+ case DELAYED_RW:
+ case ACTIVE_RW:
+ // Don't need to do anything already waiting for readable/writable
+ break;
+ }
+}
+
+void DispatchHandle::rewatchRead() {
+ if (!readableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ break;
+ case DELAYED_R:
+ case DELAYED_RW:
+ case DELAYED_DELETE:
+ break;
+ case DELAYED_W:
+ state = DELAYED_RW;
+ break;
+ case DELAYED_INACTIVE:
+ state = DELAYED_R;
+ break;
+ case ACTIVE_R:
+ case ACTIVE_RW:
+ // Nothing to do: already waiting for readable
+ break;
+ case INACTIVE:
+ assert(poller);
+ poller->modFd(*this, Poller::INPUT);
+ state = ACTIVE_R;
+ break;
+ case ACTIVE_W:
+ assert(poller);
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ break;
+ }
+}
+
+void DispatchHandle::rewatchWrite() {
+ if (!writableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ break;
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_DELETE:
+ break;
+ case DELAYED_R:
+ state = DELAYED_RW;
+ break;
+ case DELAYED_INACTIVE:
+ state = DELAYED_W;
+ break;
+ case INACTIVE:
+ assert(poller);
+ poller->modFd(*this, Poller::OUTPUT);
+ state = ACTIVE_W;
+ break;
+ case ACTIVE_R:
+ assert(poller);
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ break;
+ case ACTIVE_W:
+ case ACTIVE_RW:
+ // Nothing to do: already waiting for writable
+ break;
+ }
+}
+
+void DispatchHandle::unwatchRead() {
+ if (!readableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ break;
+ case DELAYED_R:
+ state = DELAYED_INACTIVE;
+ break;
+ case DELAYED_RW:
+ state = DELAYED_W;
+ break;
+ case DELAYED_W:
+ case DELAYED_INACTIVE:
+ case DELAYED_DELETE:
+ break;
+ case ACTIVE_R:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ case ACTIVE_RW:
+ assert(poller);
+ poller->modFd(*this, Poller::OUTPUT);
+ state = ACTIVE_W;
+ break;
+ case ACTIVE_W:
+ case INACTIVE:
+ break;
+ }
+}
+
+void DispatchHandle::unwatchWrite() {
+ if (!writableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ break;
+ case DELAYED_W:
+ state = DELAYED_INACTIVE;
+ break;
+ case DELAYED_RW:
+ state = DELAYED_R;
+ break;
+ case DELAYED_R:
+ case DELAYED_INACTIVE:
+ case DELAYED_DELETE:
+ break;
+ case ACTIVE_W:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ case ACTIVE_RW:
+ assert(poller);
+ poller->modFd(*this, Poller::INPUT);
+ state = ACTIVE_R;
+ break;
+ case ACTIVE_R:
+ case INACTIVE:
+ break;
+ }
+}
+
+void DispatchHandle::unwatch() {
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ break;
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ state = DELAYED_INACTIVE;
+ break;
+ case DELAYED_DELETE:
+ break;
+ default:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ }
+}
+
+void DispatchHandle::stopWatch() {
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ case DELAYED_DELETE:
+ return;
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ state = DELAYED_IDLE;
+ break;
+ default:
+ state = IDLE;
+ break;
+ }
+ assert(poller);
+ poller->delFd(*this);
+ poller.reset();
+}
+
+// The slightly strange switch structure
+// is to ensure that the lock is released before
+// we do the delete
+void DispatchHandle::doDelete() {
+ // Ensure that we're no longer watching anything
+ stopWatch();
+
+ // If we're in the middle of a callback defer the delete
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case DELAYED_IDLE:
+ case DELAYED_DELETE:
+ state = DELAYED_DELETE;
+ return;
+ case IDLE:
+ break;
+ default:
+ // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
+ assert(false);
+ }
+ }
+ // If we're not then do it right away
+ delete this;
+}
+
+void DispatchHandle::processEvent(Poller::EventType type) {
+ // Note that we are now doing the callbacks
+ {
+ ScopedLock<Mutex> lock(stateLock);
+
+ // Set up to wait for same events next time unless reset
+ switch(state) {
+ case ACTIVE_R:
+ state = DELAYED_R;
+ break;
+ case ACTIVE_W:
+ state = DELAYED_W;
+ break;
+ case ACTIVE_RW:
+ state = DELAYED_RW;
+ break;
+ // Can only get here in a DELAYED_* state in the rare case
+ // that we're already here for reading and we get activated for
+ // writing and we can write (it might be possible the other way
+ // round too). In this case we're already processing the handle
+ // in a different thread in this function so return right away
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ case DELAYED_IDLE:
+ case DELAYED_DELETE:
+ return;
+ default:
+ assert(false);
+ }
+ }
+
+ // Do callbacks - whilst we are doing the callbacks we are prevented from processing
+ // the same handle until we re-enable it. To avoid rentering the callbacks for a single
+ // handle re-enabling in the callbacks is actually deferred until they are complete.
+ switch (type) {
+ case Poller::READABLE:
+ readableCallback(*this);
+ break;
+ case Poller::WRITABLE:
+ writableCallback(*this);
+ break;
+ case Poller::READ_WRITABLE:
+ readableCallback(*this);
+ writableCallback(*this);
+ break;
+ case Poller::DISCONNECTED:
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ state = DELAYED_INACTIVE;
+ }
+ if (disconnectedCallback) {
+ disconnectedCallback(*this);
+ }
+ break;
+ default:
+ assert(false);
+ }
+
+ // If any of the callbacks re-enabled reading/writing then actually
+ // do it now
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case DELAYED_R:
+ poller->modFd(*this, Poller::INPUT);
+ state = ACTIVE_R;
+ return;
+ case DELAYED_W:
+ poller->modFd(*this, Poller::OUTPUT);
+ state = ACTIVE_W;
+ return;
+ case DELAYED_RW:
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ return;
+ case DELAYED_INACTIVE:
+ state = INACTIVE;
+ return;
+ case DELAYED_IDLE:
+ state = IDLE;
+ return;
+ default:
+ // This should be impossible
+ assert(false);
+ return;
+ case DELAYED_DELETE:
+ break;
+ }
+ }
+ delete this;
+}
+
+}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h Tue Oct 21 11:29:44 2008
@@ -0,0 +1,146 @@
+#ifndef _sys_DispatchHandle_h
+#define _sys_DispatchHandle_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Poller.h"
+#include "Mutex.h"
+
+#include <boost/function.hpp>
+
+
+namespace qpid {
+namespace sys {
+
+class DispatchHandleRef;
+/**
+ * In order to have your own handle (file descriptor on Unix) watched by the poller
+ * you need to:
+ *
+ * - Subclass IOHandle, in the constructor supply an appropriate
+ * IOHandlerPrivate object for the platform.
+ *
+ * - Construct a DispatchHandle passing it your IOHandle and
+ * callback functions for read, write and disconnect events.
+ *
+ * - Ensure the DispatchHandle is not deleted until the poller is no longer using it.
+ * TODO: astitcher document DispatchHandleRef to simplify this.
+ *
+ * When an event occurs on the handle, the poller calls the relevant callback and
+ * stops watching that handle. Your callback can call rewatch() or related functions
+ * to re-enable polling.
+ */
+class DispatchHandle : public PollerHandle {
+ friend class DispatchHandleRef;
+public:
+ typedef boost::function1<void, DispatchHandle&> Callback;
+
+private:
+ Callback readableCallback;
+ Callback writableCallback;
+ Callback disconnectedCallback;
+ Poller::shared_ptr poller;
+ Mutex stateLock;
+ enum {
+ IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
+ DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
+ DELAYED_DELETE
+ } state;
+
+public:
+ /**
+ * Provide a handle to poll and a set of callbacks. Note
+ * callbacks can be 0, meaning you are not interested in that
+ * event.
+ *
+ *@param h: the handle to watch. The IOHandle encapsulates a
+ * platfrom-specific handle to an IO object (e.g. a file descriptor
+ * on Unix.)
+ *@param rCb Callback called when the handle is readable.
+ *@param wCb Callback called when the handle is writable.
+ *@param dCb Callback called when the handle is disconnected.
+ */
+ DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+ PollerHandle(h),
+ readableCallback(rCb),
+ writableCallback(wCb),
+ disconnectedCallback(dCb),
+ state(IDLE)
+ {}
+
+ ~DispatchHandle();
+
+ /** Add this DispatchHandle to the poller to be watched. */
+ void startWatch(Poller::shared_ptr poller);
+
+ /** Resume watchingn for all non-0 callbacks. */
+ void rewatch();
+ /** Resume watchingn for read only. */
+ void rewatchRead();
+
+ /** Resume watchingn for write only. */
+ void rewatchWrite();
+
+ /** Stop watching temporarily. The DispatchHandle remains
+ associated with the poller and can be re-activated using
+ rewatch. */
+ void unwatch();
+ /** Stop watching for read */
+ void unwatchRead();
+ /** Stop watching for write */
+ void unwatchWrite();
+
+ /** Stop watching permanently. Disassociates from the poller. */
+ void stopWatch();
+
+protected:
+ /** Override to get extra processing done when the DispatchHandle is deleted. */
+ void doDelete();
+
+private:
+ void processEvent(Poller::EventType dir);
+};
+
+class DispatchHandleRef {
+ DispatchHandle* ref;
+
+public:
+ typedef boost::function1<void, DispatchHandle&> Callback;
+ DispatchHandleRef(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+ ref(new DispatchHandle(h, rCb, wCb, dCb))
+ {}
+
+ ~DispatchHandleRef() { ref->doDelete(); }
+
+ void startWatch(Poller::shared_ptr poller) { ref->startWatch(poller); }
+ void rewatch() { ref->rewatch(); }
+ void rewatchRead() { ref->rewatchRead(); }
+ void rewatchWrite() { ref->rewatchWrite(); }
+ void unwatch() { ref->unwatch(); }
+ void unwatchRead() { ref->unwatchRead(); }
+ void unwatchWrite() { ref->unwatchWrite(); }
+ void stopWatch() { ref->stopWatch(); }
+};
+
+}}
+
+#endif // _sys_DispatchHandle_h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Tue Oct 21 11:29:44 2008
@@ -21,8 +21,6 @@
#include "Dispatcher.h"
-#include <boost/cast.hpp>
-
#include <assert.h>
namespace qpid {
@@ -58,382 +56,4 @@
;
}
-DispatchHandle::~DispatchHandle() {
- stopWatch();
-}
-
-void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
- bool r = readableCallback;
- bool w = writableCallback;
-
- ScopedLock<Mutex> lock(stateLock);
- assert(state == IDLE);
-
- // If no callbacks set then do nothing (that is what we were asked to do!)
- // TODO: Maybe this should be an assert instead
- if (!r && !w) {
- state = INACTIVE;
- return;
- }
-
- Poller::Direction d = r ?
- (w ? Poller::INOUT : Poller::IN) :
- Poller::OUT;
-
- poller = poller0;
- poller->addFd(*this, d);
-
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
-}
-
-void DispatchHandle::rewatch() {
- bool r = readableCallback;
- bool w = writableCallback;
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_INACTIVE:
- state = r ?
- (w ? DELAYED_RW : DELAYED_R) :
- DELAYED_W;
- break;
- case DELAYED_DELETE:
- break;
- case INACTIVE:
- case ACTIVE_R:
- case ACTIVE_W: {
- assert(poller);
- Poller::Direction d = r ?
- (w ? Poller::INOUT : Poller::IN) :
- Poller::OUT;
- poller->modFd(*this, d);
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
- break;
- }
- case DELAYED_RW:
- case ACTIVE_RW:
- // Don't need to do anything already waiting for readable/writable
- break;
- }
-}
-
-void DispatchHandle::rewatchRead() {
- if (!readableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_RW:
- case DELAYED_DELETE:
- break;
- case DELAYED_W:
- state = DELAYED_RW;
- break;
- case DELAYED_INACTIVE:
- state = DELAYED_R;
- break;
- case ACTIVE_R:
- case ACTIVE_RW:
- // Nothing to do: already waiting for readable
- break;
- case INACTIVE:
- assert(poller);
- poller->modFd(*this, Poller::IN);
- state = ACTIVE_R;
- break;
- case ACTIVE_W:
- assert(poller);
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
- break;
- }
-}
-
-void DispatchHandle::rewatchWrite() {
- if (!writableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_DELETE:
- break;
- case DELAYED_R:
- state = DELAYED_RW;
- break;
- case DELAYED_INACTIVE:
- state = DELAYED_W;
- break;
- case INACTIVE:
- assert(poller);
- poller->modFd(*this, Poller::OUT);
- state = ACTIVE_W;
- break;
- case ACTIVE_R:
- assert(poller);
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
- break;
- case ACTIVE_W:
- case ACTIVE_RW:
- // Nothing to do: already waiting for writable
- break;
- }
-}
-
-void DispatchHandle::unwatchRead() {
- if (!readableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_RW:
- state = DELAYED_W;
- break;
- case DELAYED_W:
- case DELAYED_INACTIVE:
- case DELAYED_DELETE:
- break;
- case ACTIVE_R:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- case ACTIVE_RW:
- assert(poller);
- poller->modFd(*this, Poller::OUT);
- state = ACTIVE_W;
- break;
- case ACTIVE_W:
- case INACTIVE:
- break;
- }
-}
-
-void DispatchHandle::unwatchWrite() {
- if (!writableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_W:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_RW:
- state = DELAYED_R;
- break;
- case DELAYED_R:
- case DELAYED_INACTIVE:
- case DELAYED_DELETE:
- break;
- case ACTIVE_W:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- case ACTIVE_RW:
- assert(poller);
- poller->modFd(*this, Poller::IN);
- state = ACTIVE_R;
- break;
- case ACTIVE_R:
- case INACTIVE:
- break;
- }
-}
-
-void DispatchHandle::unwatch() {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_DELETE:
- break;
- default:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- }
-}
-
-void DispatchHandle::stopWatch() {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case IDLE:
- case DELAYED_IDLE:
- case DELAYED_DELETE:
- return;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- state = DELAYED_IDLE;
- break;
- default:
- state = IDLE;
- break;
- }
- assert(poller);
- poller->delFd(*this);
- poller.reset();
-}
-
-// The slightly strange switch structure
-// is to ensure that the lock is released before
-// we do the delete
-void DispatchHandle::doDelete() {
- // Ensure that we're no longer watching anything
- stopWatch();
-
- // If we're in the middle of a callback defer the delete
- {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case DELAYED_IDLE:
- case DELAYED_DELETE:
- state = DELAYED_DELETE;
- return;
- case IDLE:
- break;
- default:
- // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
- assert(false);
- }
- }
- // If we're not then do it right away
- delete this;
-}
-
-void DispatchHandle::processEvent(Poller::EventType type) {
- // Note that we are now doing the callbacks
- {
- ScopedLock<Mutex> lock(stateLock);
-
- // Set up to wait for same events next time unless reset
- switch(state) {
- case ACTIVE_R:
- state = DELAYED_R;
- break;
- case ACTIVE_W:
- state = DELAYED_W;
- break;
- case ACTIVE_RW:
- state = DELAYED_RW;
- break;
- // Can only get here in a DELAYED_* state in the rare case
- // that we're already here for reading and we get activated for
- // writing and we can write (it might be possible the other way
- // round too). In this case we're already processing the handle
- // in a different thread in this function so return right away
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- case DELAYED_IDLE:
- case DELAYED_DELETE:
- return;
- default:
- assert(false);
- }
- }
-
- // Do callbacks - whilst we are doing the callbacks we are prevented from processing
- // the same handle until we re-enable it. To avoid rentering the callbacks for a single
- // handle re-enabling in the callbacks is actually deferred until they are complete.
- switch (type) {
- case Poller::READABLE:
- readableCallback(*this);
- break;
- case Poller::WRITABLE:
- writableCallback(*this);
- break;
- case Poller::READ_WRITABLE:
- readableCallback(*this);
- writableCallback(*this);
- break;
- case Poller::DISCONNECTED:
- {
- ScopedLock<Mutex> lock(stateLock);
- state = DELAYED_INACTIVE;
- }
- if (disconnectedCallback) {
- disconnectedCallback(*this);
- }
- break;
- default:
- assert(false);
- }
-
- // If any of the callbacks re-enabled reading/writing then actually
- // do it now
- {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case DELAYED_R:
- poller->modFd(*this, Poller::IN);
- state = ACTIVE_R;
- return;
- case DELAYED_W:
- poller->modFd(*this, Poller::OUT);
- state = ACTIVE_W;
- return;
- case DELAYED_RW:
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
- return;
- case DELAYED_INACTIVE:
- state = INACTIVE;
- return;
- case DELAYED_IDLE:
- state = IDLE;
- return;
- default:
- // This should be impossible
- assert(false);
- return;
- case DELAYED_DELETE:
- break;
- }
- }
- delete this;
-}
-
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Tue Oct 21 11:29:44 2008
@@ -24,129 +24,10 @@
#include "Poller.h"
#include "Runnable.h"
-#include "Mutex.h"
-
-#include <memory>
-#include <queue>
-#include <boost/function.hpp>
-
-#include <assert.h>
-
namespace qpid {
namespace sys {
-class DispatchHandleRef;
-/**
- * In order to have your own handle (file descriptor on Unix) watched by the poller
- * you need to:
- *
- * - Subclass IOHandle, in the constructor supply an appropriate
- * IOHandlerPrivate object for the platform.
- *
- * - Construct a DispatchHandle passing it your IOHandle and
- * callback functions for read, write and disconnect events.
- *
- * - Ensure the DispatchHandle is not deleted until the poller is no longer using it.
- * TODO: astitcher document DispatchHandleRef to simplify this.
- *
- * When an event occurs on the handle, the poller calls the relevant callback and
- * stops watching that handle. Your callback can call rewatch() or related functions
- * to re-enable polling.
- */
-class DispatchHandle : public PollerHandle {
- friend class DispatchHandleRef;
-public:
- typedef boost::function1<void, DispatchHandle&> Callback;
-
-private:
- Callback readableCallback;
- Callback writableCallback;
- Callback disconnectedCallback;
- Poller::shared_ptr poller;
- Mutex stateLock;
- enum {
- IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
- DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
- DELAYED_DELETE
- } state;
-
-public:
- /**
- * Provide a handle to poll and a set of callbacks. Note
- * callbacks can be 0, meaning you are not interested in that
- * event.
- *
- *@param h: the handle to watch. The IOHandle encapsulates a
- * platfrom-specific handle to an IO object (e.g. a file descriptor
- * on Unix.)
- *@param rCb Callback called when the handle is readable.
- *@param wCb Callback called when the handle is writable.
- *@param dCb Callback called when the handle is disconnected.
- */
- DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
- PollerHandle(h),
- readableCallback(rCb),
- writableCallback(wCb),
- disconnectedCallback(dCb),
- state(IDLE)
- {}
-
- ~DispatchHandle();
-
- /** Add this DispatchHandle to the poller to be watched. */
- void startWatch(Poller::shared_ptr poller);
-
- /** Resume watchingn for all non-0 callbacks. */
- void rewatch();
- /** Resume watchingn for read only. */
- void rewatchRead();
-
- /** Resume watchingn for write only. */
- void rewatchWrite();
-
- /** Stop watching temporarily. The DispatchHandle remains
- associated with the poller and can be re-activated using
- rewatch. */
- void unwatch();
- /** Stop watching for read */
- void unwatchRead();
- /** Stop watching for write */
- void unwatchWrite();
-
- /** Stop watching permanently. Disassociates from the poller. */
- void stopWatch();
-
-protected:
- /** Override to get extra processing done when the DispatchHandle is deleted. */
- void doDelete();
-
-private:
- void processEvent(Poller::EventType dir);
-};
-
-class DispatchHandleRef {
- DispatchHandle* ref;
-
-public:
- typedef boost::function1<void, DispatchHandle&> Callback;
- DispatchHandleRef(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
- ref(new DispatchHandle(h, rCb, wCb, dCb))
- {}
-
- ~DispatchHandleRef() { ref->doDelete(); }
-
- void startWatch(Poller::shared_ptr poller) { ref->startWatch(poller); }
- void rewatch() { ref->rewatch(); }
- void rewatchRead() { ref->rewatchRead(); }
- void rewatchWrite() { ref->rewatchWrite(); }
- void unwatch() { ref->unwatch(); }
- void unwatchRead() { ref->unwatchRead(); }
- void unwatchWrite() { ref->unwatchWrite(); }
- void stopWatch() { ref->stopWatch(); }
-};
-
-
class Dispatcher : public Runnable {
const Poller::shared_ptr poller;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Tue Oct 21 11:29:44 2008
@@ -45,8 +45,8 @@
enum Direction {
NONE = 0,
- IN,
- OUT,
+ INPUT,
+ OUTPUT,
INOUT
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Tue Oct 21 11:29:44 2008
@@ -92,13 +92,14 @@
if (isClient)
async->setClient();
- AsynchIO* aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
+ AsynchIO* aio = AsynchIO::create
+ (s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, 4);
aio->start(poller);
@@ -133,9 +134,13 @@
// is no longer needed.
Socket* socket = new Socket();
- new AsynchConnector (*socket, poller, host, port,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true),
- failed);
+ AsynchConnector::create (*socket,
+ poller,
+ host,
+ port,
+ boost::bind(&AsynchIOProtocolFactory::established,
+ this, poller, _1, fact, true),
+ failed);
}
}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Oct 21 11:29:44 2008
@@ -162,9 +162,9 @@
static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
switch (dir) {
- case Poller::IN: return ::EPOLLIN;
- case Poller::OUT: return ::EPOLLOUT;
- case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT;
+ case Poller::INPUT: return ::EPOLLIN;
+ case Poller::OUTPUT: return ::EPOLLOUT;
+ case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT;
default: return 0;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Oct 21 11:29:44 2008
@@ -26,8 +26,8 @@
#include "check.h"
-// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction
-// could (should) be promoted to be platform portable
+// TODO The basic algorithm here is not really POSIX specific and with a
+// bit more abstraction could (should) be promoted to be platform portable
#include <unistd.h>
#include <sys/socket.h>
#include <signal.h>
@@ -65,24 +65,55 @@
/*
* Asynch Acceptor
*/
+namespace qpid {
+namespace sys {
+
+class AsynchAcceptorPrivate {
+public:
+ AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
+ void start(Poller::shared_ptr poller);
+
+private:
+ void readable(DispatchHandle& handle);
+
+private:
+ AsynchAcceptor::Callback acceptedCallback;
+ DispatchHandle handle;
+ const Socket& socket;
+
+};
+
+}} // namespace qpid::sys
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
+ impl(new AsynchAcceptorPrivate(s, callback))
+{}
+
+AsynchAcceptor::~AsynchAcceptor()
+{ delete impl;}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ impl->start(poller);
+}
+
+AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s,
+ AsynchAcceptor::Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+ handle(s, boost::bind(&AsynchAcceptorPrivate::readable, this, _1), 0, 0),
socket(s) {
s.setNonblocking();
ignoreSigpipe();
}
-void AsynchAcceptor::start(Poller::shared_ptr poller) {
+void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
handle.startWatch(poller);
}
/*
* We keep on accepting as long as there is something to accept
*/
-void AsynchAcceptor::readable(DispatchHandle& h) {
+void AsynchAcceptorPrivate::readable(DispatchHandle& h) {
Socket* s;
do {
errno = 0;
@@ -106,6 +137,36 @@
/*
* Asynch Connector
*/
+namespace qpid {
+namespace sys {
+namespace posix {
+
+/*
+ * POSIX version of AsynchIO TCP socket connector.
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
+ */
+class AsynchConnector : public qpid::sys::AsynchConnector,
+ private DispatchHandle {
+
+private:
+ void connComplete(DispatchHandle& handle);
+ void failure(int, std::string);
+
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ const Socket& socket;
+
+public:
+ AsynchConnector(const Socket& socket,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+};
AsynchConnector::AsynchConnector(const Socket& s,
Poller::shared_ptr poller,
@@ -155,9 +216,85 @@
DispatchHandle::doDelete();
}
+} // namespace posix
+
+
+AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+{
+ return new qpid::sys::posix::AsynchConnector(s,
+ poller,
+ hostname,
+ port,
+ connCb,
+ failCb);
+}
+
/*
- * Asynch reader/writer
+ * POSIX version of AsynchIO reader/writer
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
*/
+namespace posix {
+
+class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle {
+
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ virtual void queueForDeletion();
+
+ virtual void start(Poller::shared_ptr poller);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual BufferBase* getQueuedBuffer();
+
+private:
+ ~AsynchIO();
+
+ // Methods that are callback targets from Dispatcher.
+ void readable(DispatchHandle& handle);
+ void writeable(DispatchHandle& handle);
+ void disconnected(DispatchHandle& handle);
+ void close(DispatchHandle& handle);
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ bool queuedClose;
+ /**
+ * This flag is used to detect and handle concurrency between
+ * calls to notifyPendingWrite() (which can be made from any thread) and
+ * the execution of the writeable() method (which is always on the
+ * thread processing this handle.
+ */
+ volatile bool writePending;
+};
+
AsynchIO::AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
@@ -239,6 +376,10 @@
DispatchHandle::rewatchWrite();
}
+bool AsynchIO::writeQueueEmpty() {
+ return writeQueue.empty();
+}
+
/** Return a queued buffer if there are enough
* to spare
*/
@@ -427,3 +568,17 @@
}
}
+} // namespace posix
+
+AsynchIO* qpid::sys::AsynchIO::create(const Socket& s,
+ AsynchIO::ReadCallback rCb,
+ AsynchIO::EofCallback eofCb,
+ AsynchIO::DisconnectCallback disCb,
+ AsynchIO::ClosedCallback cCb,
+ AsynchIO::BuffersEmptyCallback eCb,
+ AsynchIO::IdleCallback iCb)
+{
+ return new qpid::sys::posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
+}
+
+}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp?rev=706709&r1=706708&r2=706709&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp Tue Oct 21 11:29:44 2008
@@ -129,9 +129,9 @@
static uint32_t directionToPollEvent(Poller::Direction dir) {
switch (dir) {
- case Poller::IN: return POLLIN;
- case Poller::OUT: return POLLOUT;
- case Poller::INOUT: return POLLIN | POLLOUT;
+ case Poller::INPUT: return POLLIN;
+ case Poller::OUTPUT: return POLLOUT;
+ case Poller::INOUT: return POLLIN | POLLOUT;
default: return 0;
}
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Tue Oct 21 11:29:44 2008
@@ -0,0 +1,729 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIoResult.h"
+#include "IoHandlePrivate.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
+
+#include "check.h"
+
+#include <boost/thread/once.hpp>
+
+#include <winsock2.h>
+#include <mswsock.h>
+#include <windows.h>
+
+#include <boost/bind.hpp>
+
+namespace {
+
+ typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock;
+
+/*
+ * We keep per thread state to avoid locking overhead. The assumption is that
+ * on average all the connections are serviced by all the threads so the state
+ * recorded in each thread is about the same. If this turns out not to be the
+ * case we could rebalance the info occasionally.
+ */
+QPID_TSS int threadReadTotal = 0;
+QPID_TSS int threadMaxRead = 0;
+QPID_TSS int threadReadCount = 0;
+QPID_TSS int threadWriteTotal = 0;
+QPID_TSS int threadWriteCount = 0;
+QPID_TSS int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
+
+/*
+ * The function pointers for AcceptEx and ConnectEx need to be looked up
+ * at run time. Make sure this is done only once.
+ */
+boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
+LPFN_ACCEPTEX fnAcceptEx = 0;
+typedef void (*lookUpFunc)(const qpid::sys::Socket &);
+
+void lookUpAcceptEx() {
+ SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ GUID guidAcceptEx = WSAID_ACCEPTEX;
+ DWORD dwBytes = 0;
+ WSAIoctl(h,
+ SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &guidAcceptEx,
+ sizeof(guidAcceptEx),
+ &fnAcceptEx,
+ sizeof(fnAcceptEx),
+ &dwBytes,
+ NULL,
+ NULL);
+ closesocket(h);
+ if (fnAcceptEx == 0)
+ throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+}
+
+}
+
+namespace qpid {
+namespace sys {
+
+/*
+ * Asynch Acceptor
+ *
+ * This implementation uses knowledge that the DispatchHandle handle member
+ * is derived from PollerHandle, which has a reference to the Socket.
+ * No dispatching features of DispatchHandle are used - we just use the
+ * conduit to the Socket.
+ *
+ * AsynchAcceptor uses an AsynchAcceptResult object to track completion
+ * and status of each accept operation outstanding.
+ */
+
+class AsynchAcceptorPrivate {
+
+ friend class AsynchAcceptResult;
+
+public:
+ AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptorPrivate();
+ void start(Poller::shared_ptr poller);
+
+private:
+ void restart(void);
+
+ AsynchAcceptor::Callback acceptedCallback;
+ const Socket& socket;
+};
+
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
+ impl(new AsynchAcceptorPrivate(s, callback))
+{}
+
+AsynchAcceptor::~AsynchAcceptor()
+{ delete impl; }
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ impl->start(poller);
+}
+
+AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s,
+ AsynchAcceptor::Callback callback)
+ : acceptedCallback(callback),
+ socket(s) {
+
+ s.setNonblocking();
+#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */
+ boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
+#else
+ boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
+#endif
+}
+
+AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) {
+ socket.close();
+}
+
+void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
+ poller->addFd(PollerHandle(socket), Poller::INPUT);
+ restart ();
+}
+
+void AsynchAcceptor::restart(void) {
+ DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
+ AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
+ this,
+ toFd(socket.impl));
+ BOOL status;
+ status = ::fnAcceptEx(toFd(socket.impl),
+ toFd(result->newSocket->impl),
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
+ QPID_WINDOWS_CHECK_ASYNC_START(status);
+}
+
+
+AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
+ AsynchAcceptor *acceptor,
+ SOCKET listener)
+ : callback(cb), acceptor(acceptor), listener(listener) {
+ newSocket.reset (new Socket());
+}
+
+void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
+ ::setsockopt (toFd(newSocket->impl),
+ SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ (char*)&listener,
+ sizeof (listener));
+ callback(*(newSocket.release()));
+ acceptor->restart ();
+ delete this;
+}
+
+void AsynchAcceptResult::failure(int status) {
+ if (status != WSA_OPERATION_ABORTED)
+ ;
+ delete this;
+}
+
+namespace windows {
+
+/*
+ * AsynchConnector does synchronous connects for now... to do asynch the
+ * IocpPoller will need some extension to register an event handle as a
+ * CONNECT-type "direction", the connect completion/result will need an
+ * event handle to associate with the connecting handle. But there's no
+ * time for that right now...
+ */
+class AsynchConnector : public qpid::sys::AsynchConnector {
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ const Socket& socket;
+
+public:
+ AsynchConnector(const Socket& socket,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+};
+
+AsynchConnector::AsynchConnector(const Socket& sock,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+ : connCallback(connCb), failCallback(failCb), socket(sock) {
+ socket.setNonblocking();
+ try {
+ socket.connect(hostname, port);
+ connCallback(socket);
+ } catch(std::exception& e) {
+ if (failCallback)
+ failCallback(-1, std::string(e.what()));
+ socket.close();
+ delete &socket;
+ }
+}
+
+} // namespace windows
+
+AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
+ Poller::shared_ptr poller,
+ std::string hostname,
+ uint16_t port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+{
+ return new qpid::sys::windows::AsynchConnector(s,
+ poller,
+ hostname,
+ port,
+ connCb,
+ failCb);
+}
+
+
+/*
+ * Asynch reader/writer
+ */
+
+namespace windows {
+
+class AsynchIO : public qpid::sys::AsynchIO {
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+ ~AsynchIO();
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ /**
+ * Notify the object is should delete itself as soon as possible.
+ */
+ virtual void queueForDeletion();
+
+ /// Take any actions needed to prepare for working with the poller.
+ virtual void start(Poller::shared_ptr poller);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+
+ /**
+ * getQueuedBuffer returns a buffer from the buffer queue, if one is
+ * available.
+ *
+ * @retval Pointer to BufferBase buffer; 0 if none is available.
+ */
+ virtual BufferBase* getQueuedBuffer();
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ Poller::shared_ptr poller;
+
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ /* The MSVC-supplied deque is not thread-safe; keep locks to serialize
+ * access to the buffer queue and write queue.
+ */
+ Mutex bufferQueueLock;
+
+ // Number of outstanding I/O operations.
+ volatile LONG opsInProgress;
+ // Is there a write in progress?
+ volatile bool writeInProgress;
+ // Deletion requested, but there are callbacks in progress.
+ volatile bool queuedDelete;
+ // Socket close requested, but there are operations in progress.
+ volatile bool queuedClose;
+
+private:
+ void close(void);
+
+ /**
+ * Initiate a read operation. AsynchIO::dispatchReadComplete() will be
+ * called when the read is complete and data is available.
+ */
+ virtual void startRead(void);
+
+ /**
+ * Initiate a write of the specified buffer. There's no callback for
+ * write completion to the AsynchIO object.
+ */
+ virtual void startWrite(AsynchIO::BufferBase* buff);
+
+ virtual bool writesNotComplete();
+
+ /**
+ * readComplete is called when a read request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void readComplete(AsynchReadResult *result);
+
+ /**
+ * writeComplete is called when a write request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void writeComplete(AsynchWriteResult *result);
+
+ /**
+ * Queue of completions to run. This queue enforces the requirement
+ * from upper layers that only one thread at a time is allowed to act
+ * on any given connection. Once a thread is busy processing a completion
+ * on this object, other threads that dispatch completions queue the
+ * completions here for the in-progress thread to handle when done.
+ * Thus, any threads can dispatch a completion from the IocpPoller, but
+ * this class ensures that actual processing at the connection level is
+ * only on one thread at a time.
+ */
+ std::queue<AsynchIoResult *> completionQueue;
+ volatile bool working;
+ Mutex completionLock;
+
+ /**
+ * Called when there's a completion to process.
+ */
+ void completion(AsynchIoResult *result);
+};
+
+AsynchIO::AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb) :
+
+ readCallback(rCb),
+ eofCallback(eofCb),
+ disCallback(disCb),
+ closedCallback(cCb),
+ emptyCallback(eCb),
+ idleCallback(iCb),
+ socket(s),
+ opsInProgress(0),
+ writeInProgress(false),
+ queuedDelete(false),
+ queuedClose(false),
+ working(false) {
+}
+
+struct deleter
+{
+ template <typename T>
+ void operator()(T *ptr){ delete ptr;}
+};
+
+AsynchIO::~AsynchIO() {
+ std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
+ std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
+}
+
+void AsynchIO::queueForDeletion() {
+ queuedDelete = true;
+ if (opsInProgress > 0) {
+ QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+ // AsynchIOHandler calls this then deletes itself; don't do any more
+ // callbacks.
+ readCallback = 0;
+ eofCallback = 0;
+ disCallback = 0;
+ closedCallback = 0;
+ emptyCallback = 0;
+ idleCallback = 0;
+ }
+ else {
+ delete this;
+ }
+}
+
+void AsynchIO::start(Poller::shared_ptr poller0) {
+ poller = poller0;
+ poller->addFd(PollerHandle(socket), Poller::INPUT);
+ if (writeQueue.size() > 0) // Already have data queued for write
+ notifyPendingWrite();
+ startRead();
+}
+
+void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ QLock l(bufferQueueLock);
+ bufferQueue.push_back(buff);
+}
+
+void AsynchIO::unread(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ if (buff->dataStart != 0) {
+ memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+ buff->dataStart = 0;
+ }
+ QLock l(bufferQueueLock);
+ bufferQueue.push_front(buff);
+}
+
+void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ QLock l(bufferQueueLock);
+ writeQueue.push_back(buff);
+ if (!writeInProgress)
+ notifyPendingWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ IOHandlePrivate *hp =
+ new IOHandlePrivate (INVALID_SOCKET,
+ boost::bind(&AsynchIO::completion, this, _1));
+ IOHandle h(hp);
+ PollerHandle ph(h);
+ poller->addFd(ph, Poller::OUTPUT);
+}
+
+void AsynchIO::queueWriteClose() {
+ queuedClose = true;
+ if (!writeInProgress)
+ notifyPendingWrite();
+}
+
+bool AsynchIO::writeQueueEmpty() {
+ QLock l(bufferQueueLock);
+ return writeQueue.size() == 0;
+}
+
+/**
+ * Return a queued buffer if there are enough to spare.
+ */
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
+ QLock l(bufferQueueLock);
+ // Always keep at least one buffer (it might have data that was
+ // "unread" in it).
+ if (bufferQueue.size() <= 1)
+ return 0;
+ BufferBase* buff = bufferQueue.back();
+ assert(buff);
+ bufferQueue.pop_back();
+ return buff;
+}
+
+void AsynchIO::dispatchReadComplete(AsynchIO::BufferBase *buffer) {
+ if (readCallback)
+ readCallback(*this, buffer);
+}
+
+void AsynchIO::notifyEof(void) {
+ if (eofCallback)
+ eofCallback(*this);
+}
+
+void AsynchIO::notifyDisconnect(void) {
+ if (disCallback)
+ disCallback(*this);
+}
+
+void AsynchIO::notifyClosed(void) {
+ if (closedCallback)
+ closedCallback(*this, socket);
+}
+
+void AsynchIO::notifyBuffersEmpty(void) {
+ if (emptyCallback)
+ emptyCallback(*this);
+}
+
+void AsynchIO::notifyIdle(void) {
+ if (idleCallback)
+ idleCallback(*this);
+}
+
+/*
+ * Asynch reader/writer using overlapped I/O
+ */
+
+void AsynchIO::startRead(void) {
+ if (queuedDelete)
+ return;
+
+ // (Try to) get a buffer; look on the front since there may be an
+ // "unread" one there with data remaining from last time.
+ AsynchIO::BufferBase *buff = 0;
+ {
+ QLock l(bufferQueueLock);
+
+ if (!bufferQueue.empty()) {
+ buff = bufferQueue.front();
+ assert(buff);
+ bufferQueue.pop_front();
+ }
+ }
+ if (buff != 0) {
+ int readCount = buff->byteCount - buff->dataCount;
+ AsynchReadResult *result =
+ new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1),
+ buff,
+ readCount);
+ DWORD bytesReceived = 0, flags = 0;
+ InterlockedIncrement(&opsInProgress);
+ int status = WSARecv(toFd(socket.impl),
+ const_cast<LPWSABUF>(result->getWSABUF()), 1,
+ &bytesReceived,
+ &flags,
+ result->overlapped(),
+ 0);
+ if (status != 0) {
+ int error = WSAGetLastError();
+ if (error != WSA_IO_PENDING) {
+ result->failure(error);
+ result = 0; // result is invalid here
+ return;
+ }
+ }
+ // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ }
+ else {
+ notifyBuffersEmpty();
+ }
+ return;
+}
+
+void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
+ writeInProgress = true;
+ InterlockedIncrement(&opsInProgress);
+ int writeCount = buff->byteCount-buff->dataCount;
+ AsynchWriteResult *result =
+ new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
+ buff,
+ buff->dataCount);
+ DWORD bytesSent = 0;
+ int status = WSASend(toFd(socket.impl),
+ const_cast<LPWSABUF>(result->getWSABUF()), 1,
+ &bytesSent,
+ 0,
+ result->overlapped(),
+ 0);
+ if (status != 0) {
+ int error = WSAGetLastError();
+ if (error != WSA_IO_PENDING) {
+ result->failure(error); // Also decrements in-progress count
+ result = 0; // result is invalid here
+ return;
+ }
+ }
+ // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ return;
+}
+
+bool AsynchIO::writesNotComplete() {
+ return writeInProgress;
+}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(void) {
+ socket.close();
+ notifyClosed();
+}
+
+void AsynchIO::readComplete(AsynchReadResult *result) {
+ ++threadReadCount;
+ int status = result->getStatus();
+ size_t bytes = result->getTransferred();
+ if (status == 0 && bytes > 0) {
+ threadReadTotal += bytes;
+ dispatchReadComplete(result->getBuff());
+ startRead();
+ }
+ else {
+ // No data read, so put the buffer back. It may be partially filled,
+ // so "unread" it back to the front of the queue.
+ unread(result->getBuff());
+ if (status == 0)
+ notifyEof();
+ else
+ notifyDisconnect();
+ }
+}
+
+/*
+ * NOTE - this completion is called for completed writes and also when
+ * a write is desired. The difference is in the buff - if a write is desired
+ * the buff is 0.
+ */
+void AsynchIO::writeComplete(AsynchWriteResult *result) {
+ int status = result->getStatus();
+ size_t bytes = result->getTransferred();
+ AsynchIO::BufferBase *buff = result->getBuff();
+ if (buff != 0) {
+ ++threadWriteCount;
+ writeInProgress = false;
+ if (status == 0 && bytes > 0) {
+ threadWriteTotal += bytes;
+ if (bytes < result->getRequested()) // Still more to go; resubmit
+ startWrite(buff);
+ else
+ queueReadBuffer(buff); // All done; back to the pool
+ }
+ else {
+ // An error... if it's a connection close, ignore it - it will be
+ // noticed and handled on a read completion any moment now.
+ // What to do with real error??? Save the Buffer?
+ }
+ }
+
+ // If there are no writes outstanding, the priority is to write any
+ // remaining buffers first (either queued or via idle), then close the
+ // socket if that's queued.
+ // opsInProgress handled in completion()
+ if (!writeInProgress) {
+ bool writing = false;
+ {
+ QLock l(bufferQueueLock);
+ if (writeQueue.size() > 0) {
+ buff = writeQueue.front();
+ assert(buff);
+ writeQueue.pop_front();
+ startWrite(buff);
+ writing = true;
+ }
+ }
+ if (!writing) {
+ if (queuedClose)
+ close();
+ else
+ notifyIdle();
+ }
+ }
+ return;
+}
+
+void AsynchIO::completion(AsynchIoResult *result) {
+ {
+ ScopedLock<Mutex> l(completionLock);
+ if (working) {
+ completionQueue.push(result);
+ return;
+ }
+
+ // First thread in with something to do; note we're working then keep
+ // handling completions.
+ working = true;
+ while (result != 0) {
+ // New scope to unlock temporarily.
+ {
+ ScopedUnlock<Mutex> ul(completionLock);
+ AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
+ if (r != 0)
+ readComplete(r);
+ else {
+ AsynchWriteResult *w =
+ dynamic_cast<AsynchWriteResult*>(result);
+ writeComplete(w);
+ }
+ delete result;
+ result = 0;
+ InterlockedDecrement(&opsInProgress);
+ }
+ // Lock is held again.
+ if (completionQueue.empty())
+ continue;
+ result = completionQueue.front();
+ completionQueue.pop();
+ }
+ working = false;
+ }
+ // Lock released; ok to delete if all is done.
+ if (opsInProgress == 0 && queuedDelete)
+ delete this;
+}
+
+}} // namespace qpid::windows
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h Tue Oct 21 11:29:44 2008
@@ -0,0 +1,187 @@
+#ifndef _windows_asynchIoResult_h
+#define _windows_asynchIoResult_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include <memory.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
+namespace qpid {
+namespace sys {
+
+/*
+ * AsynchIoResult defines the class that receives the result of an
+ * asynchronous I/O operation, either send/recv or accept/connect.
+ *
+ * Operation factories should set one of these up before beginning the
+ * operation. Poller knows how to dispatch completion to this class.
+ * This class must be subclassed for needed operations; this class provides
+ * an interface only and cannot be instantiated.
+ *
+ * This class is tied to Windows; it inherits from OVERLAPPED so that the
+ * IocpPoller can cast OVERLAPPED pointers back to AsynchIoResult and call
+ * the completion handler.
+ */
+class AsynchResult : private OVERLAPPED {
+public:
+ LPOVERLAPPED overlapped(void) { return this; }
+ static AsynchResult* from_overlapped(LPOVERLAPPED ol) {
+ return static_cast<AsynchResult*>(ol);
+ }
+ virtual void success (size_t bytesTransferred) {
+ bytes = bytesTransferred;
+ status = 0;
+ complete();
+ }
+ virtual void failure (int error) {
+ bytes = 0;
+ status = error;
+ complete();
+ }
+ size_t getTransferred(void) const { return bytes; }
+ int getStatus(void) const { return status; }
+
+protected:
+ AsynchResult() : bytes(0), status(0)
+ { memset(overlapped(), 0, sizeof(OVERLAPPED)); }
+ ~AsynchResult() {}
+ virtual void complete(void) = 0;
+
+ size_t bytes;
+ int status;
+};
+
+class AsynchAcceptorPrivate;
+class AsynchAcceptResult : public AsynchResult {
+
+ friend class AsynchAcceptorPrivate;
+
+public:
+ AsynchAcceptResult(AsynchAcceptor::Callback cb,
+ AsynchAcceptorPrivate *acceptor,
+ SOCKET listener);
+ virtual void success (size_t bytesTransferred);
+ virtual void failure (int error);
+
+private:
+ virtual void complete(void) {} // No-op for this class.
+
+ std::auto_ptr<qpid::sys::Socket> newSocket;
+ AsynchAcceptor::Callback callback;
+ AsynchAcceptorPrivate *acceptor;
+ SOCKET listener;
+
+ // AcceptEx needs a place to write the local and remote addresses
+ // when accepting the connection. Place those here; get enough for
+ // IPv6 addresses, even if the socket is IPv4.
+ enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16,
+ SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
+ char addressBuffer[SOCKADDRBUFLEN];
+};
+
+class AsynchIO;
+
+class AsynchIoResult : public AsynchResult {
+public:
+ typedef boost::function1<void, AsynchIoResult *> Completer;
+
+ virtual ~AsynchIoResult() {}
+ AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
+ size_t getRequested(void) const { return requested; }
+ const WSABUF *getWSABUF(void) const { return &wsabuf; }
+
+protected:
+ void setBuff (AsynchIO::BufferBase *buffer) { iobuff = buffer; }
+
+protected:
+ AsynchIoResult(Completer cb,
+ AsynchIO::BufferBase *buff, size_t length)
+ : completionCallback(cb), iobuff(buff), requested(length) {}
+
+ virtual void complete(void) = 0;
+ WSABUF wsabuf;
+ Completer completionCallback;
+
+private:
+ AsynchIO::BufferBase *iobuff;
+ size_t requested; // Number of bytes in original I/O request
+};
+
+class AsynchReadResult : public AsynchIoResult {
+
+ // complete() updates buffer then does completion callback.
+ virtual void complete(void) {
+ getBuff()->dataCount += bytes;
+ completionCallback(this);
+ }
+
+public:
+ AsynchReadResult(AsynchIoResult::Completer cb,
+ AsynchIO::BufferBase *buff,
+ size_t length)
+ : AsynchIoResult(cb, buff, length) {
+ wsabuf.buf = buff->bytes + buff->dataCount;
+ wsabuf.len = length;
+ }
+};
+
+class AsynchWriteResult : public AsynchIoResult {
+
+ // complete() updates buffer then does completion callback.
+ virtual void complete(void) {
+ AsynchIO::BufferBase *b = getBuff();
+ b->dataStart += bytes;
+ b->dataCount -= bytes;
+ completionCallback(this);
+ }
+
+public:
+ AsynchWriteResult(AsynchIoResult::Completer cb,
+ AsynchIO::BufferBase *buff,
+ size_t length)
+ : AsynchIoResult(cb, buff, length) {
+ wsabuf.buf = buff ? buff->bytes : 0;
+ wsabuf.len = length;
+ }
+};
+
+class AsynchWriteWanted : public AsynchWriteResult {
+
+ // complete() just does completion callback; no buffers used.
+ virtual void complete(void) {
+ completionCallback(this);
+ }
+
+public:
+ AsynchWriteWanted(AsynchIoResult::Completer cb)
+ : AsynchWriteResult(cb, 0, 0) {
+ wsabuf.buf = 0;
+ wsabuf.len = 0;
+ }
+};
+
+}}
+
+#endif /*!_windows_asynchIoResult_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp Tue Oct 21 11:29:44 2008
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+#include "IoHandlePrivate.h"
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+SOCKET toFd(const IOHandlePrivate* h)
+{
+ return h->fd;
+}
+
+IOHandle::IOHandle(IOHandlePrivate* h) :
+ impl(h)
+{}
+
+IOHandle::~IOHandle() {
+ delete impl;
+}
+
+}} // namespace qpid::sys
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h Tue Oct 21 11:29:44 2008
@@ -0,0 +1,52 @@
+#ifndef _sys_windows_IoHandlePrivate_h
+#define _sys_windows_IoHandlePrivate_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIoResult.h"
+
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+// Private fd related implementation details
+// There should be either a valid socket handle or a completer callback.
+// Handle is used to associate with poller's iocp; completer is used to
+// inject a completion that will very quickly trigger a callback to the
+// completer from an I/O thread.
+class IOHandlePrivate {
+public:
+ IOHandlePrivate(SOCKET f = INVALID_SOCKET,
+ AsynchIoResult::Completer cb = 0) :
+ fd(f), event(cb)
+ {}
+
+ SOCKET fd;
+ AsynchIoResult::Completer event;
+};
+
+SOCKET toFd(const IOHandlePrivate* h);
+
+}}
+
+#endif /* _sys_windows_IoHandlePrivate_h */
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp?rev=706709&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp Tue Oct 21 11:29:44 2008
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Dispatcher.h"
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+Dispatcher::Dispatcher(Poller::shared_ptr poller0) :
+ poller(poller0) {
+}
+
+Dispatcher::~Dispatcher() {
+}
+
+void Dispatcher::run() {
+ do {
+ Poller::Event event = poller->wait();
+
+ // Handle shutdown
+ switch (event.type) {
+ case Poller::SHUTDOWN:
+ return;
+ break;
+ case Poller::INVALID: // On any type of success or fail completion
+ break;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ } while (true);
+}
+
+}}