You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/01/07 00:42:19 UTC
svn commit: r732177 - in /qpid/trunk/qpid/cpp/src: qpid/sys/ qpid/sys/epoll/
qpid/sys/posix/ qpid/sys/windows/ tests/
Author: astitcher
Date: Tue Jan 6 15:42:18 2009
New Revision: 732177
URL: http://svn.apache.org/viewvc?rev=732177&view=rev
Log:
Work on the low level IO code:
* Introduce code so that you can interrupt waiting for a handle and receive
a callback that is correctly serialised with the IO callbacks for that
handle
Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Tue Jan 6 15:42:18 2009
@@ -114,6 +114,7 @@
typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
typedef boost::function1<void, AsynchIO&> IdleCallback;
+ typedef boost::function1<void, AsynchIO&> RequestCallback;
// Call create() to allocate a new AsynchIO object with the specified
// callbacks. This method is implemented in platform-specific code to
@@ -138,6 +139,7 @@
virtual void queueWriteClose() = 0;
virtual bool writeQueueEmpty() = 0;
virtual void startReading() = 0;
+ virtual void requestCallback(RequestCallback) = 0;
virtual BufferBase* getQueuedBuffer() = 0;
protected:
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Tue Jan 6 15:42:18 2009
@@ -270,22 +270,30 @@
case IDLE:
case DELAYED_IDLE:
case DELAYED_DELETE:
- return;
+ return;
case DELAYED_R:
case DELAYED_W:
case DELAYED_RW:
case DELAYED_INACTIVE:
- state = DELAYED_IDLE;
- break;
+ state = DELAYED_IDLE;
+ break;
default:
- state = IDLE;
- break;
+ state = IDLE;
+ break;
}
assert(poller);
poller->delFd(*this);
poller.reset();
}
+void DispatchHandle::call(Callback iCb) {
+ assert(iCb);
+ ScopedLock<Mutex> lock(stateLock);
+ interruptedCallbacks.push(iCb);
+
+ (void) poller->interrupt(*this);
+}
+
// The slightly strange switch structure
// is to ensure that the lock is released before
// we do the delete
@@ -302,9 +310,9 @@
state = DELAYED_DELETE;
return;
case IDLE:
- break;
+ break;
default:
- // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
+ // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
assert(false);
}
}
@@ -368,14 +376,29 @@
disconnectedCallback(*this);
}
break;
+ case Poller::INTERRUPTED:
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ assert(interruptedCallbacks.size() > 0);
+ // We'll actually do the interrupt below
+ }
+ break;
default:
assert(false);
}
- // If any of the callbacks re-enabled reading/writing then actually
- // do it now
{
ScopedLock<Mutex> lock(stateLock);
+ // If we've got a pending interrupt do it now
+ while (interruptedCallbacks.size() > 0) {
+ Callback cb = interruptedCallbacks.front();
+ assert(cb);
+ cb(*this);
+ interruptedCallbacks.pop();
+ }
+
+ // If any of the callbacks re-enabled reading/writing then actually
+ // do it now
switch (state) {
case DELAYED_R:
poller->modFd(*this, Poller::INPUT);
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h Tue Jan 6 15:42:18 2009
@@ -27,6 +27,7 @@
#include <boost/function.hpp>
+#include <queue>
namespace qpid {
namespace sys {
@@ -53,11 +54,13 @@
friend class DispatchHandleRef;
public:
typedef boost::function1<void, DispatchHandle&> Callback;
+ typedef std::queue<Callback> CallbackQueue;
private:
Callback readableCallback;
Callback writableCallback;
Callback disconnectedCallback;
+ CallbackQueue interruptedCallbacks;
Poller::shared_ptr poller;
Mutex stateLock;
enum {
@@ -92,12 +95,12 @@
/** Add this DispatchHandle to the poller to be watched. */
void startWatch(Poller::shared_ptr poller);
- /** Resume watchingn for all non-0 callbacks. */
+ /** Resume watching for all non-0 callbacks. */
void rewatch();
- /** Resume watchingn for read only. */
+ /** Resume watching for read only. */
void rewatchRead();
- /** Resume watchingn for write only. */
+ /** Resume watching for write only. */
void rewatchWrite();
/** Stop watching temporarily. The DispatchHandle remains
@@ -112,6 +115,11 @@
/** Stop watching permanently. Disassociates from the poller. */
void stopWatch();
+ /** Interrupt watching this handle and make a serialised callback that respects the
+ * same exclusivity guarantees as the other callbacks
+ */
+ void call(Callback iCb);
+
protected:
/** Override to get extra processing done when the DispatchHandle is deleted. */
void doDelete();
@@ -139,6 +147,7 @@
void unwatchRead() { ref->unwatchRead(); }
void unwatchWrite() { ref->unwatchWrite(); }
void stopWatch() { ref->stopWatch(); }
+ void call(Callback iCb) { ref->call(iCb); }
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Tue Jan 6 15:42:18 2009
@@ -34,26 +34,7 @@
}
void Dispatcher::run() {
- do {
- Poller::Event event = poller->wait();
-
- // If can read/write then dispatch appropriate callbacks
- if (event.handle) {
- event.process();
- } else {
- // Handle shutdown
- switch (event.type) {
- case Poller::SHUTDOWN:
- goto dispatcher_shutdown;
- default:
- // This should be impossible
- assert(false);
- }
- }
- } while (true);
-
-dispatcher_shutdown:
- ;
+ poller->run();
}
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Tue Jan 6 15:42:18 2009
@@ -23,6 +23,7 @@
*/
#include "Time.h"
+#include "Runnable.h"
#include <boost/shared_ptr.hpp>
@@ -37,7 +38,7 @@
*/
class PollerHandle;
class PollerPrivate;
-class Poller {
+class Poller : public Runnable {
PollerPrivate* const impl;
public:
@@ -57,7 +58,8 @@
READ_WRITABLE,
DISCONNECTED,
SHUTDOWN,
- TIMEOUT
+ TIMEOUT,
+ INTERRUPTED
};
struct Event {
@@ -76,6 +78,20 @@
~Poller();
/** Note: this function is async-signal safe */
void shutdown();
+
+ // Interrupt waiting for a specific poller handle
+ // returns true if we could interrupt the handle
+ // - in this case on return the handle is no longer being monitored,
+ // but we will receive an event from some invocation of poller::wait
+ // with the handle and the INTERRUPTED event type
+ // if it returns false then the handle is not being monitored by the poller
+ // - This can either be because it has just received an event which has been
+ // reported and has not been reenabled since. Or because it was removed
+ // from the monitoring set
+ bool interrupt(PollerHandle& handle);
+
+ // Poller run loop
+ void run();
void addFd(PollerHandle& handle, Direction dir);
void delFd(PollerHandle& handle);
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Jan 6 15:42:18 2009
@@ -28,9 +28,10 @@
#include <sys/epoll.h>
#include <errno.h>
+#include <signal.h>
#include <assert.h>
-#include <vector>
+#include <queue>
#include <exception>
namespace qpid {
@@ -58,14 +59,12 @@
int fd;
::__uint32_t events;
- PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(int f, PollerHandle* p) :
+ PollerHandlePrivate(int f) :
fd(f),
events(0),
- pollerHandle(p),
stat(ABSENT) {
}
@@ -112,7 +111,7 @@
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), this))
+ impl(new PollerHandlePrivate(toFd(h.impl)))
{}
PollerHandle::~PollerHandle() {
@@ -161,9 +160,47 @@
};
static ReadablePipe alwaysReadable;
+ static int alwaysReadableFd;
+ class InterruptHandle: public PollerHandle {
+ std::queue<PollerHandle*> handles;
+
+ void processEvent(Poller::EventType) {
+ PollerHandle* handle = handles.front();
+ handles.pop();
+ assert(handle);
+
+ // Synthesise event
+ Poller::Event event(handle, Poller::INTERRUPTED);
+
+ // Process synthesised event
+ event.process();
+ }
+
+ public:
+ InterruptHandle() :
+ PollerHandle(DummyIOHandle)
+ {}
+
+ void addHandle(PollerHandle& h) {
+ handles.push(&h);
+ }
+
+ PollerHandle* getHandle() {
+ PollerHandle* handle = handles.front();
+ handles.pop();
+ return handle;
+ }
+
+ bool queuedHandles() {
+ return handles.size() > 0;
+ }
+ };
+
const int epollFd;
bool isShutdown;
+ InterruptHandle interruptHandle;
+ ::sigset_t sigMask;
static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
switch (dir) {
@@ -193,15 +230,41 @@
epollFd(::epoll_create(DefaultFds)),
isShutdown(false) {
QPID_POSIX_CHECK(epollFd);
+ ::sigemptyset(&sigMask);
+ // Add always readable fd into our set (but not listening to it yet)
+ ::epoll_event epe;
+ epe.events = 0;
+ epe.data.u64 = 0;
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe));
}
~PollerPrivate() {
// It's probably okay to ignore any errors here as there can't be data loss
::close(epollFd);
}
+
+ void interrupt(bool all=false) {
+ ::epoll_event epe;
+ if (all) {
+ // Not EPOLLONESHOT, so we eventually get all threads
+ epe.events = ::EPOLLIN;
+ epe.data.u64 = 0; // Keep valgrind happy
+ } else {
+ // Use EPOLLONESHOT so we only wake a single thread
+ epe.events = ::EPOLLIN | ::EPOLLONESHOT;
+ epe.data.u64 = 0; // Keep valgrind happy
+ epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
+ }
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));
+ }
+
+ void interruptAll() {
+ interrupt(true);
+ }
};
PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
+int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
void Poller::addFd(PollerHandle& handle, Direction dir) {
PollerHandlePrivate& eh = *handle.impl;
@@ -218,7 +281,7 @@
epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
}
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
+ epe.data.ptr = &handle;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
@@ -249,7 +312,7 @@
::epoll_event epe;
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
+ epe.data.ptr = &handle;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
@@ -266,7 +329,7 @@
::epoll_event epe;
epe.events = eh.events;
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
+ epe.data.ptr = &handle;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
@@ -281,28 +344,85 @@
if (impl->isShutdown)
return;
- // Don't use any locking here - isshutdown will be visible to all
+ // Don't use any locking here - isShutdown will be visible to all
// after the epoll_ctl() anyway (it's a memory barrier)
impl->isShutdown = true;
- // Add always readable fd to epoll (not EPOLLONESHOT)
- int fd = impl->alwaysReadable.getFD();
- ::epoll_event epe;
- epe.events = ::EPOLLIN;
- epe.data.u64 = 0; // Keep valgrind happy - don't strictly need next line now
- epe.data.ptr = 0;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe));
+ impl->interruptAll();
+}
+
+bool Poller::interrupt(PollerHandle& handle) {
+ {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ if (eh.isInactive()) {
+ return false;
+ }
+ ::epoll_event epe;
+ epe.events = 0;
+ epe.data.u64 = 0; // Keep valgrind happy
+ epe.data.ptr = &eh;
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ eh.setInactive();
+ }
+
+ PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
+ PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;
+ ScopedLock<Mutex> l(eh.lock);
+ ih.addHandle(handle);
+
+ impl->interrupt();
+ eh.setActive();
+ return true;
+}
+
+void Poller::run() {
+ // Make sure we can't be interrupted by signals at a bad time
+ ::sigset_t ss;
+ ::sigfillset(&ss);
+ ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+ do {
+ Event event = wait();
+
+ // If can read/write then dispatch appropriate callbacks
+ if (event.handle) {
+ event.process();
+ } else {
+ // Handle shutdown
+ switch (event.type) {
+ case SHUTDOWN:
+ return;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ }
+ } while (true);
}
Poller::Event Poller::wait(Duration timeout) {
epoll_event epe;
int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
+ AbsTime targetTimeout =
+ (timeout == TIME_INFINITE) ?
+ FAR_FUTURE :
+ AbsTime(now(), timeout);
- // Repeat until we weren't interupted
+ // Repeat until we weren't interrupted by signal
do {
PollerHandleDeletionManager.markAllUnusedInThisThread();
+ // Need to run on kernels without epoll_pwait()
+ // - fortunately in this case we don't really need the atomicity of epoll_pwait()
+#if 1
+ sigset_t os;
+ pthread_sigmask(SIG_SETMASK, &impl->sigMask, &os);
int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
-
+ pthread_sigmask(SIG_SETMASK, &os, 0);
+#else
+ int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask);
+#endif
+ // Check for shutdown
if (impl->isShutdown) {
PollerHandleDeletionManager.markAllUnusedInThisThread();
return Event(0, SHUTDOWN);
@@ -312,13 +432,27 @@
QPID_POSIX_CHECK(rc);
} else if (rc > 0) {
assert(rc == 1);
- PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(epe.data.ptr);
+ PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
+ PollerHandlePrivate& eh = *handle->impl;
ScopedLock<Mutex> l(eh.lock);
-
+
// the handle could have gone inactive since we left the epoll_wait
if (eh.isActive()) {
- PollerHandle* handle = eh.pollerHandle;
+
+ // Check if this is an interrupt
+ if (handle == &impl->interruptHandle) {
+ PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();
+ // If there is an interrupt queued behind this one we need to arm it
+ // We do it this way so that another thread can pick it up
+ if (impl->interruptHandle.queuedHandles()) {
+ impl->interrupt();
+ eh.setActive();
+ } else {
+ eh.setInactive();
+ }
+ return Event(wrappedHandle, INTERRUPTED);
+ }
// If the connection has been hungup we could still be readable
// (just not writable), allow us to readable until we get here again
@@ -349,10 +483,12 @@
// The only things we can do here are return a timeout or wait more.
// Obviously if we timed out we return timeout; if the wait was meant to
// be indefinite then we should never return with a time out so we go again.
- // If the wait wasn't indefinite, but we were interrupted then we have to return
- // with a timeout as we don't know how long we've waited so far and so we can't
- // continue the wait.
- if (rc == 0 || timeoutMs != -1) {
+ // If the wait wasn't indefinite, we check whether we are after the target wait
+ // time or not
+ if (timeoutMs == -1) {
+ continue;
+ }
+ if (rc == 0 && now() > targetTimeout) {
PollerHandleDeletionManager.markAllUnusedInThisThread();
return Event(0, TIMEOUT);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Jan 6 15:42:18 2009
@@ -266,6 +266,7 @@
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
virtual void startReading();
+ virtual void requestCallback(RequestCallback);
virtual BufferBase* getQueuedBuffer();
private:
@@ -275,6 +276,7 @@
void readable(DispatchHandle& handle);
void writeable(DispatchHandle& handle);
void disconnected(DispatchHandle& handle);
+ void requestedCall(RequestCallback);
void close(DispatchHandle& handle);
private:
@@ -386,6 +388,18 @@
DispatchHandle::rewatchRead();
}
+void AsynchIO::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback));
+}
+
+void AsynchIO::requestedCall(RequestCallback callback) {
+ assert(callback);
+ callback(*this);
+}
+
/** Return a queued buffer if there are enough
* to spare
*/
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp Tue Jan 6 15:42:18 2009
@@ -31,6 +31,8 @@
return h->fd;
}
+NullIOHandle DummyIOHandle;
+
IOHandle::IOHandle(IOHandlePrivate* h) :
impl(h)
{}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h Tue Jan 6 15:42:18 2009
@@ -23,6 +23,7 @@
*/
#include "qpid/sys/Time.h"
+#include "qpid/sys/IOHandle.h"
struct timespec;
struct timeval;
@@ -47,6 +48,25 @@
int toFd(const IOHandlePrivate* h);
+// Posix fd as an IOHandle
+class PosixIOHandle : public IOHandle {
+public:
+ PosixIOHandle(int fd) :
+ IOHandle(new IOHandlePrivate(fd))
+ {}
+};
+
+// Dummy IOHandle for places it's required in the API
+// but we promise not to actually try to do any operations on the IOHandle
+class NullIOHandle : public IOHandle {
+public:
+ NullIOHandle() :
+ IOHandle(new IOHandlePrivate)
+ {}
+};
+
+extern NullIOHandle DummyIOHandle;
+
}}
#endif /*!_sys_posix_PrivatePosix_h*/
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Tue Jan 6 15:42:18 2009
@@ -284,6 +284,7 @@
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
virtual void startReading();
+ virtual void requestCallback(RequestCallback);
/**
* getQueuedBuffer returns a buffer from the buffer queue, if one is
@@ -531,6 +532,11 @@
return;
}
+// TODO: This needs to arrange for a callback that is serialised with
+// the other IO callbacks for this AsynchIO
+void AsynchIO::requestCallback(RequestCallback callback) {
+}
+
/**
* Return a queued buffer if there are enough to spare.
*/
Modified: qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp Tue Jan 6 15:42:18 2009
@@ -20,7 +20,10 @@
*/
#include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/sys/Thread.h"
#include <sys/types.h>
@@ -28,6 +31,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
+#include <signal.h>
#include <iostream>
#include <boost/bind.hpp>
@@ -74,7 +78,26 @@
h.rewatch();
}
-int main(int argc, char** argv)
+DispatchHandle* rh = 0;
+DispatchHandle* wh = 0;
+
+void rInterrupt(DispatchHandle&) {
+ cerr << "R";
+}
+
+void wInterrupt(DispatchHandle&) {
+ cerr << "W";
+}
+
+DispatchHandle::Callback rcb = rInterrupt;
+DispatchHandle::Callback wcb = wInterrupt;
+
+void timer_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) {
+ rh->call(rcb);
+ wh->call(wcb);
+}
+
+int main(int /*argc*/, char** /*argv*/)
{
// Create poller
Poller::shared_ptr poller(new Poller);
@@ -82,12 +105,12 @@
// Create dispatcher thread
Dispatcher d(poller);
Dispatcher d1(poller);
- //Dispatcher d2(poller);
- //Dispatcher d3(poller);
+ Dispatcher d2(poller);
+ Dispatcher d3(poller);
Thread dt(d);
Thread dt1(d1);
- //Thread dt2(d2);
- //Thread dt3(d3);
+ Thread dt2(d2);
+ Thread dt3(d3);
// Setup sender and receiver
int sv[2];
@@ -106,22 +129,58 @@
for (int i = 0; i < 8; i++)
testString += testString;
- DispatchHandle rh(sv[0], boost::bind(reader, _1, sv[0]), 0);
- DispatchHandle wh(sv[1], 0, boost::bind(writer, _1, sv[1], testString));
+ PosixIOHandle f0(sv[0]);
+ PosixIOHandle f1(sv[1]);
- rh.watch(poller);
- wh.watch(poller);
+ rh = new DispatchHandle(f0, boost::bind(reader, _1, sv[0]), 0, 0);
+ wh = new DispatchHandle(f1, 0, boost::bind(writer, _1, sv[1], testString), 0);
- // wait 2 minutes then shutdown
- sleep(60);
+ rh->startWatch(poller);
+ wh->startWatch(poller);
+
+ // Set up a regular itimer interupt
+
+ // Ignore signal in this thread
+ ::sigset_t sm;
+ ::sigemptyset(&sm);
+ ::sigaddset(&sm, SIGRTMIN);
+ ::pthread_sigmask(SIG_BLOCK, &sm, 0);
+
+ // Signal handling
+ struct ::sigaction sa;
+ sa.sa_sigaction = timer_handler;
+ sa.sa_flags = SA_RESTART | SA_SIGINFO;
+ ::sigemptyset(&sa.sa_mask);
+ rc = ::sigaction(SIGRTMIN, &sa,0);
+ assert(rc == 0);
+
+ ::sigevent se;
+ se.sigev_notify = SIGEV_SIGNAL;
+ se.sigev_signo = SIGRTMIN;
+ se.sigev_value.sival_ptr = 0;
+ timer_t timer;
+ rc = ::timer_create(CLOCK_REALTIME, &se, &timer);
+ assert(rc == 0);
+
+ itimerspec ts = {
+ /*.it_value = */ {2, 0}, // s, ns
+ /*.it_interval = */ {2, 0}}; // s, ns
+ rc = ::timer_settime(timer, 0, &ts, 0);
+ assert(rc == 0);
+
+ // wait 2 minutes then shutdown
+ ::sleep(60);
+
+ rc = ::timer_delete(timer);
+ assert(rc == 0);
poller->shutdown();
dt.join();
dt1.join();
- //dt2.join();
- //dt3.join();
+ dt2.join();
+ dt3.join();
- cout << "Wrote: " << writtenBytes << "\n";
+ cout << "\nWrote: " << writtenBytes << "\n";
cout << "Read: " << readBytes << "\n";
return 0;
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Jan 6 15:42:18 2009
@@ -187,6 +187,13 @@
sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h
sender_LDADD=$(lib_client)
+check_PROGRAMS+=PollerTest
+PollerTest_SOURCES=PollerTest.cpp
+PollerTest_LDADD=$(lib_common)
+
+check_PROGRAMS+=DispatcherTest
+DispatcherTest_SOURCES=DispatcherTest.cpp
+DispatcherTest_LDADD=$(lib_common)
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Tue Jan 6 15:42:18 2009
@@ -23,7 +23,9 @@
* Use socketpair to test the poller
*/
+#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/posix/PrivatePosix.h"
#include <string>
#include <iostream>
@@ -67,7 +69,7 @@
return bytesRead;
}
-int main(int argc, char** argv)
+int main(int /*argc*/, char** /*argv*/)
{
try
{
@@ -103,15 +105,18 @@
auto_ptr<Poller> poller(new Poller);
- PollerHandle h0(sv[0]);
- PollerHandle h1(sv[1]);
+ PosixIOHandle f0(sv[0]);
+ PosixIOHandle f1(sv[1]);
+
+ PollerHandle h0(f0);
+ PollerHandle h1(f1);
poller->addFd(h0, Poller::INOUT);
- // Wait for 500ms - h0 should be writable
+ // h0 should be writable
Poller::Event event = poller->wait();
assert(event.handle == &h0);
- assert(event.dir == Poller::OUT);
+ assert(event.type == Poller::WRITABLE);
// Write as much as we can to socket 0
bytesWritten = writeALot(sv[0], testString);
@@ -126,17 +131,48 @@
poller->addFd(h1, Poller::INOUT);
event = poller->wait();
assert(event.handle == &h1);
- assert(event.dir == Poller::INOUT);
+ assert(event.type == Poller::READ_WRITABLE);
+
+ // Can't interrupt, it's not active
+ assert(poller->interrupt(h1) == false);
bytesRead = readALot(sv[1]);
assert(bytesRead == bytesWritten);
cout << "Read(1): " << bytesRead << " bytes\n";
+
+ // Test poller interrupt
+ assert(poller->interrupt(h0) == true);
+ event = poller->wait();
+ assert(event.handle == &h0);
+ assert(event.type == Poller::INTERRUPTED);
+ assert(poller->interrupt(h0) == false);
+
+ // Test multiple interrupts
+ poller->rearmFd(h0);
+ poller->rearmFd(h1);
+ assert(poller->interrupt(h0) == true);
+ assert(poller->interrupt(h1) == true);
+
+ // Make sure we can't interrupt them again
+ assert(poller->interrupt(h0) == false);
+ assert(poller->interrupt(h1) == false);
+ // Make sure that they both come out in the correct order
+ event = poller->wait();
+ assert(event.handle == &h0);
+ assert(event.type == Poller::INTERRUPTED);
+ assert(poller->interrupt(h0) == false);
+ event = poller->wait();
+ assert(event.handle == &h1);
+ assert(event.type == Poller::INTERRUPTED);
+ assert(poller->interrupt(h1) == false);
+
// At this point h1 should have been disabled from the poller
// (as it was just returned) and h0 can write again
+ poller->rearmFd(h0);
event = poller->wait();
assert(event.handle == &h0);
- assert(event.dir == Poller::OUT);
+ assert(event.type == Poller::WRITABLE);
// Now both the handles should be disabled
event = poller->wait(500000000);
@@ -146,11 +182,11 @@
poller->shutdown();
event = poller->wait();
assert(event.handle == 0);
- assert(event.dir == Poller::SHUTDOWN);
+ assert(event.type == Poller::SHUTDOWN);
event = poller->wait();
assert(event.handle == 0);
- assert(event.dir == Poller::SHUTDOWN);
+ assert(event.type == Poller::SHUTDOWN);
poller->delFd(h1);
poller->delFd(h0);