You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/01/07 00:42:19 UTC

svn commit: r732177 - in /qpid/trunk/qpid/cpp/src: qpid/sys/ qpid/sys/epoll/ qpid/sys/posix/ qpid/sys/windows/ tests/

Author: astitcher
Date: Tue Jan  6 15:42:18 2009
New Revision: 732177

URL: http://svn.apache.org/viewvc?rev=732177&view=rev
Log:
Work on the low level IO code:
* Introduce code so that you can interrupt waiting for a handle and receive
  a callback that is correctly serialised with the IO callbacks for that
  handle

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
    qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Tue Jan  6 15:42:18 2009
@@ -114,6 +114,7 @@
     typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
     typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
     typedef boost::function1<void, AsynchIO&> IdleCallback;
+    typedef boost::function1<void, AsynchIO&> RequestCallback;
 
     // Call create() to allocate a new AsynchIO object with the specified
     // callbacks. This method is implemented in platform-specific code to
@@ -138,6 +139,7 @@
     virtual void queueWriteClose() = 0;
     virtual bool writeQueueEmpty() = 0;
     virtual void startReading() = 0;
+    virtual void requestCallback(RequestCallback) = 0;
     virtual BufferBase* getQueuedBuffer() = 0;
 
 protected:

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Tue Jan  6 15:42:18 2009
@@ -270,22 +270,30 @@
     case IDLE:
     case DELAYED_IDLE:
     case DELAYED_DELETE:
-    	return;
+        return;
     case DELAYED_R:
     case DELAYED_W:
     case DELAYED_RW:
     case DELAYED_INACTIVE:
-    	state = DELAYED_IDLE;
-    	break;
+        state = DELAYED_IDLE;
+        break;
     default:
-	    state = IDLE;
-	    break;
+        state = IDLE;
+        break;
     }
     assert(poller);
     poller->delFd(*this);
     poller.reset();
 }
 
+void DispatchHandle::call(Callback iCb) {
+    assert(iCb);
+    ScopedLock<Mutex> lock(stateLock);
+    interruptedCallbacks.push(iCb);
+    
+    (void) poller->interrupt(*this);
+}
+
 // The slightly strange switch structure
 // is to ensure that the lock is released before
 // we do the delete
@@ -302,9 +310,9 @@
         state = DELAYED_DELETE;
         return;
     case IDLE:
-    	break;
+        break;
     default:
-    	// Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
+        // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
         assert(false);
     }
     }
@@ -368,14 +376,29 @@
             disconnectedCallback(*this);
         }
         break;
+    case Poller::INTERRUPTED:
+        {
+        ScopedLock<Mutex> lock(stateLock);
+        assert(interruptedCallbacks.size() > 0);
+        // We'll actually do the interrupt below
+        }
+        break;
     default:
         assert(false);
     }
 
-    // If any of the callbacks re-enabled reading/writing then actually
-    // do it now
     {
     ScopedLock<Mutex> lock(stateLock);
+    // If we've got a pending interrupt do it now
+    while (interruptedCallbacks.size() > 0) {
+        Callback cb = interruptedCallbacks.front();
+        assert(cb);
+        cb(*this);
+        interruptedCallbacks.pop();
+    }
+
+    // If any of the callbacks re-enabled reading/writing then actually
+    // do it now
     switch (state) {
     case DELAYED_R:
         poller->modFd(*this, Poller::INPUT);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h Tue Jan  6 15:42:18 2009
@@ -27,6 +27,7 @@
 
 #include <boost/function.hpp>
 
+#include <queue>
 
 namespace qpid {
 namespace sys {
@@ -53,11 +54,13 @@
     friend class DispatchHandleRef;
 public:
     typedef boost::function1<void, DispatchHandle&> Callback;
+    typedef std::queue<Callback> CallbackQueue;
 
 private:
     Callback readableCallback;
     Callback writableCallback;
     Callback disconnectedCallback;
+    CallbackQueue interruptedCallbacks;
     Poller::shared_ptr poller;
     Mutex stateLock;
     enum {
@@ -92,12 +95,12 @@
     /** Add this DispatchHandle to the poller to be watched. */
     void startWatch(Poller::shared_ptr poller);
 
-    /** Resume watchingn for all non-0 callbacks. */
+    /** Resume watching for all non-0 callbacks. */
     void rewatch();
-    /** Resume watchingn for read only. */
+    /** Resume watching for read only. */
     void rewatchRead();
 
-    /** Resume watchingn for write only. */
+    /** Resume watching for write only. */
     void rewatchWrite();
 
     /** Stop watching temporarily. The DispatchHandle remains
@@ -112,6 +115,11 @@
     /** Stop watching permanently. Disassociates from the poller. */
     void stopWatch();
     
+    /** Interrupt watching this handle and make a serialised callback that respects the
+     * same exclusivity guarantees as the other callbacks
+     */
+    void call(Callback iCb);
+
 protected:
     /** Override to get extra processing done when the DispatchHandle is deleted. */
     void doDelete();
@@ -139,6 +147,7 @@
     void unwatchRead() { ref->unwatchRead(); }
     void unwatchWrite() { ref->unwatchWrite(); }
     void stopWatch() { ref->stopWatch(); }
+    void call(Callback iCb) { ref->call(iCb); }
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Tue Jan  6 15:42:18 2009
@@ -34,26 +34,7 @@
 }
     
 void Dispatcher::run() {
-    do {
-        Poller::Event event = poller->wait();
-
-        // If can read/write then dispatch appropriate callbacks        
-        if (event.handle) {
-            event.process();
-        } else {
-            // Handle shutdown
-            switch (event.type) {
-            case Poller::SHUTDOWN:
-                goto dispatcher_shutdown;
-            default:
-                // This should be impossible
-                assert(false);
-            }
-        }
-    } while (true);
-    
-dispatcher_shutdown:
-    ;
+	poller->run();
 }
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Tue Jan  6 15:42:18 2009
@@ -23,6 +23,7 @@
  */
 
 #include "Time.h"
+#include "Runnable.h"
 
 #include <boost/shared_ptr.hpp>
 
@@ -37,7 +38,7 @@
  */
 class PollerHandle;
 class PollerPrivate;
-class Poller {
+class Poller : public Runnable {
     PollerPrivate* const impl;
 
 public:
@@ -57,7 +58,8 @@
         READ_WRITABLE,
         DISCONNECTED,
         SHUTDOWN,
-        TIMEOUT
+        TIMEOUT,
+        INTERRUPTED
     };
 
     struct Event {
@@ -76,6 +78,20 @@
     ~Poller();
     /** Note: this function is async-signal safe */
     void shutdown();
+    
+    // Interrupt waiting for a specific poller handle
+    // returns true if we could interrupt the handle
+    // - in this case on return the handle is no longer being monitored,
+    //   but we will receive an event from some invocation of poller::wait
+    //   with the handle and the INTERRUPTED event type
+    // if it returns false then the handle is not being monitored by the poller
+    // - This can either be because it has just received an event which has been
+    //   reported and has not been reenabled since. Or because it was removed
+    //   from the monitoring set
+    bool interrupt(PollerHandle& handle);
+    
+    // Poller run loop
+    void run();
 
     void addFd(PollerHandle& handle, Direction dir);
     void delFd(PollerHandle& handle);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Jan  6 15:42:18 2009
@@ -28,9 +28,10 @@
 
 #include <sys/epoll.h>
 #include <errno.h>
+#include <signal.h>
 
 #include <assert.h>
-#include <vector>
+#include <queue>
 #include <exception>
 
 namespace qpid {
@@ -58,14 +59,12 @@
 
     int fd;
     ::__uint32_t events;
-    PollerHandle* pollerHandle;
     FDStat stat;
     Mutex lock;
 
-    PollerHandlePrivate(int f, PollerHandle* p) :
+    PollerHandlePrivate(int f) :
       fd(f),
       events(0),
-      pollerHandle(p),
       stat(ABSENT) {
     }
 
@@ -112,7 +111,7 @@
 };
 
 PollerHandle::PollerHandle(const IOHandle& h) :
-    impl(new PollerHandlePrivate(toFd(h.impl), this))
+    impl(new PollerHandlePrivate(toFd(h.impl)))
 {}
 
 PollerHandle::~PollerHandle() {
@@ -161,9 +160,47 @@
     };
 
     static ReadablePipe alwaysReadable;
+    static int alwaysReadableFd;
 
+    class InterruptHandle: public PollerHandle {
+    	std::queue<PollerHandle*> handles;
+    	
+    	void processEvent(Poller::EventType) {
+    		PollerHandle* handle = handles.front();
+    		handles.pop();
+    		assert(handle);
+    		
+    		// Synthesise event
+    		Poller::Event event(handle, Poller::INTERRUPTED);
+
+    		// Process synthesised event
+    		event.process();
+    	}
+
+    public:
+    	InterruptHandle() :
+    		PollerHandle(DummyIOHandle)
+    	{}
+    	
+    	void addHandle(PollerHandle& h) {
+    		handles.push(&h);
+    	}
+    	
+    	PollerHandle* getHandle() {
+    		PollerHandle* handle = handles.front();
+    		handles.pop();
+    		return handle;
+    	}
+    	
+    	bool queuedHandles() {
+    		return handles.size() > 0;
+    	}
+    };
+    
     const int epollFd;
     bool isShutdown;
+    InterruptHandle interruptHandle;
+    ::sigset_t sigMask;
 
     static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
         switch (dir) {
@@ -193,15 +230,41 @@
         epollFd(::epoll_create(DefaultFds)),
         isShutdown(false) {
         QPID_POSIX_CHECK(epollFd);
+        ::sigemptyset(&sigMask);
+        // Add always readable fd into our set (but not listening to it yet)
+        ::epoll_event epe;
+        epe.events = 0;
+        epe.data.u64 = 0;
+        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe));   
     }
 
     ~PollerPrivate() {
         // It's probably okay to ignore any errors here as there can't be data loss
         ::close(epollFd);
     }
+    
+    void interrupt(bool all=false) {
+	    ::epoll_event epe;
+	    if (all) {
+	        // Not EPOLLONESHOT, so we eventually get all threads
+		    epe.events = ::EPOLLIN;
+		    epe.data.u64 = 0; // Keep valgrind happy
+	    } else {
+	    	// Use EPOLLONESHOT so we only wake a single thread
+		    epe.events = ::EPOLLIN | ::EPOLLONESHOT;
+		    epe.data.u64 = 0; // Keep valgrind happy
+		    epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);	    
+	    }
+	    QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));	
+    }
+    
+    void interruptAll() {
+    	interrupt(true);
+    }
 };
 
 PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
+int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
 
 void Poller::addFd(PollerHandle& handle, Direction dir) {
     PollerHandlePrivate& eh = *handle.impl;
@@ -218,7 +281,7 @@
         epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
     }
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &eh;
+    epe.data.ptr = &handle;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
 
@@ -249,7 +312,7 @@
     ::epoll_event epe;
     epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &eh;
+    epe.data.ptr = &handle;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
 
@@ -266,7 +329,7 @@
     ::epoll_event epe;
     epe.events = eh.events;
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &eh;
+    epe.data.ptr = &handle;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
 
@@ -281,28 +344,85 @@
     if (impl->isShutdown)
         return;
 
-    // Don't use any locking here - isshutdown will be visible to all
+    // Don't use any locking here - isShutdown will be visible to all
     // after the epoll_ctl() anyway (it's a memory barrier)
     impl->isShutdown = true;
 
-    // Add always readable fd to epoll (not EPOLLONESHOT)
-    int fd = impl->alwaysReadable.getFD();
-    ::epoll_event epe;
-    epe.events = ::EPOLLIN;
-    epe.data.u64 = 0; // Keep valgrind happy - don't strictly need next line now
-    epe.data.ptr = 0;
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe));
+    impl->interruptAll();
+}
+
+bool Poller::interrupt(PollerHandle& handle) {
+	{
+	    PollerHandlePrivate& eh = *handle.impl;
+	    ScopedLock<Mutex> l(eh.lock);
+	    if (eh.isInactive()) {
+	    	return false;
+	    }
+	    ::epoll_event epe;
+	    epe.events = 0;
+	    epe.data.u64 = 0; // Keep valgrind happy
+	    epe.data.ptr = &eh;
+	    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+	    eh.setInactive();
+	}
+
+	PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
+    PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;
+    ScopedLock<Mutex> l(eh.lock);
+	ih.addHandle(handle);
+    
+	impl->interrupt();
+    eh.setActive();
+    return true;
+}
+
+void Poller::run() {
+	// Make sure we can't be interrupted by signals at a bad time
+	::sigset_t ss;
+	::sigfillset(&ss);
+    ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+    do {
+        Event event = wait();
+
+        // If can read/write then dispatch appropriate callbacks        
+        if (event.handle) {
+            event.process();
+        } else {
+            // Handle shutdown
+            switch (event.type) {
+            case SHUTDOWN:
+                return;
+            default:
+                // This should be impossible
+                assert(false);
+            }
+        }
+    } while (true);
 }
 
 Poller::Event Poller::wait(Duration timeout) {
     epoll_event epe;
     int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
+    AbsTime targetTimeout = 
+        (timeout == TIME_INFINITE) ?
+            FAR_FUTURE :
+            AbsTime(now(), timeout); 
 
-    // Repeat until we weren't interupted
+    // Repeat until we weren't interrupted by signal
     do {
         PollerHandleDeletionManager.markAllUnusedInThisThread();
+        // Need to run on kernels without epoll_pwait()
+        // - fortunately in this case we don't really need the atomicity of epoll_pwait()
+#if 1
+        sigset_t os;
+        pthread_sigmask(SIG_SETMASK, &impl->sigMask, &os);
         int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
-
+        pthread_sigmask(SIG_SETMASK, &os, 0);
+#else
+        int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask);
+#endif
+        // Check for shutdown
         if (impl->isShutdown) {
             PollerHandleDeletionManager.markAllUnusedInThisThread();
             return Event(0, SHUTDOWN);
@@ -312,13 +432,27 @@
             QPID_POSIX_CHECK(rc);
         } else if (rc > 0) {
             assert(rc == 1);
-            PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(epe.data.ptr);
+            PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
 
+            PollerHandlePrivate& eh = *handle->impl;
             ScopedLock<Mutex> l(eh.lock);
-
+            
             // the handle could have gone inactive since we left the epoll_wait
             if (eh.isActive()) {
-                PollerHandle* handle = eh.pollerHandle;
+
+                // Check if this is an interrupt
+                if (handle == &impl->interruptHandle) {
+                	PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();
+                	// If there is an interrupt queued behind this one we need to arm it
+                	// We do it this way so that another thread can pick it up
+                	if (impl->interruptHandle.queuedHandles()) {
+                		impl->interrupt();
+                		eh.setActive();
+                	} else {
+                		eh.setInactive();
+                	}
+                	return Event(wrappedHandle, INTERRUPTED);
+                }
 
                 // If the connection has been hungup we could still be readable
                 // (just not writable), allow us to readable until we get here again
@@ -349,10 +483,12 @@
         // The only things we can do here are return a timeout or wait more.
         // Obviously if we timed out we return timeout; if the wait was meant to
         // be indefinite then we should never return with a time out so we go again.
-        // If the wait wasn't indefinite, but we were interrupted then we have to return
-        // with a timeout as we don't know how long we've waited so far and so we can't
-        // continue the wait.
-        if (rc == 0 || timeoutMs != -1) {
+        // If the wait wasn't indefinite, we check whether we are after the target wait
+        // time or not
+        if (timeoutMs == -1) {
+            continue;
+        }
+        if (rc == 0 && now() > targetTimeout) {
             PollerHandleDeletionManager.markAllUnusedInThisThread();
             return Event(0, TIMEOUT);
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Jan  6 15:42:18 2009
@@ -266,6 +266,7 @@
     virtual void queueWriteClose();
     virtual bool writeQueueEmpty();
     virtual void startReading();
+    virtual void requestCallback(RequestCallback);
     virtual BufferBase* getQueuedBuffer();
 
 private:
@@ -275,6 +276,7 @@
     void readable(DispatchHandle& handle);
     void writeable(DispatchHandle& handle);
     void disconnected(DispatchHandle& handle);
+    void requestedCall(RequestCallback);
     void close(DispatchHandle& handle);
 
 private:
@@ -386,6 +388,18 @@
     DispatchHandle::rewatchRead();
 }
 
+void AsynchIO::requestCallback(RequestCallback callback) {
+    // TODO creating a function object every time isn't all that
+    // efficient - if this becomes heavily used do something better (what?)
+    assert(callback);
+    DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback));
+}
+
+void AsynchIO::requestedCall(RequestCallback callback) {
+    assert(callback);
+    callback(*this);
+}
+
 /** Return a queued buffer if there are enough
  * to spare
  */

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp Tue Jan  6 15:42:18 2009
@@ -31,6 +31,8 @@
     return h->fd;
 }
 
+NullIOHandle DummyIOHandle;
+
 IOHandle::IOHandle(IOHandlePrivate* h) :
   impl(h)
 {}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h Tue Jan  6 15:42:18 2009
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/sys/Time.h"
+#include "qpid/sys/IOHandle.h"
 
 struct timespec;
 struct timeval;
@@ -47,6 +48,25 @@
 
 int toFd(const IOHandlePrivate* h);
 
+// Posix fd as an IOHandle
+class PosixIOHandle : public IOHandle {
+public:
+    PosixIOHandle(int fd) :
+        IOHandle(new IOHandlePrivate(fd))
+    {}
+};
+
+// Dummy IOHandle for places it's required in the API
+// but we promise not to actually try to do any operations on the IOHandle
+class NullIOHandle : public IOHandle {
+public:
+    NullIOHandle() :
+        IOHandle(new IOHandlePrivate)
+    {}
+};
+
+extern NullIOHandle DummyIOHandle;
+
 }}
 
 #endif  /*!_sys_posix_PrivatePosix_h*/

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Tue Jan  6 15:42:18 2009
@@ -284,6 +284,7 @@
     virtual void queueWriteClose();
     virtual bool writeQueueEmpty();
     virtual void startReading();
+    virtual void requestCallback(RequestCallback);
 
     /**
      * getQueuedBuffer returns a buffer from the buffer queue, if one is
@@ -531,6 +532,11 @@
     return;
 }
 
+// TODO: This needs to arrange for a callback that is serialised with
+// the other IO callbacks for this AsynchIO
+void AsynchIO::requestCallback(RequestCallback callback) {
+}
+
 /**
  * Return a queued buffer if there are enough to spare.
  */

Modified: qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp Tue Jan  6 15:42:18 2009
@@ -20,7 +20,10 @@
  */
 
 #include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
 #include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/posix/PrivatePosix.h"
 #include "qpid/sys/Thread.h"
 
 #include <sys/types.h>
@@ -28,6 +31,7 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <errno.h>
+#include <signal.h>
 
 #include <iostream>
 #include <boost/bind.hpp>
@@ -74,7 +78,26 @@
     h.rewatch();
 }
 
-int main(int argc, char** argv)
+DispatchHandle* rh = 0;
+DispatchHandle* wh = 0;
+
+void rInterrupt(DispatchHandle&) {
+	cerr << "R";
+}
+
+void wInterrupt(DispatchHandle&) {
+	cerr << "W";	
+}
+
+DispatchHandle::Callback rcb = rInterrupt;
+DispatchHandle::Callback wcb = wInterrupt;
+
+void timer_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) {
+	rh->call(rcb);
+	wh->call(wcb);
+}
+
+int main(int /*argc*/, char** /*argv*/)
 {
     // Create poller
     Poller::shared_ptr poller(new Poller);
@@ -82,12 +105,12 @@
     // Create dispatcher thread
     Dispatcher d(poller);
     Dispatcher d1(poller);
-    //Dispatcher d2(poller);
-    //Dispatcher d3(poller);
+    Dispatcher d2(poller);
+    Dispatcher d3(poller);
     Thread dt(d);
     Thread dt1(d1);
-    //Thread dt2(d2);
-    //Thread dt3(d3);
+    Thread dt2(d2);
+    Thread dt3(d3);
 
     // Setup sender and receiver
     int sv[2];
@@ -106,22 +129,58 @@
     for (int i = 0; i < 8; i++)
         testString += testString;
 
-    DispatchHandle rh(sv[0], boost::bind(reader, _1, sv[0]), 0);
-    DispatchHandle wh(sv[1], 0, boost::bind(writer, _1, sv[1], testString));    
+    PosixIOHandle f0(sv[0]);
+    PosixIOHandle f1(sv[1]);
 
-    rh.watch(poller);
-    wh.watch(poller);
+    rh = new DispatchHandle(f0, boost::bind(reader, _1, sv[0]), 0, 0);
+    wh = new DispatchHandle(f1, 0, boost::bind(writer, _1, sv[1], testString), 0);    
 
-    // wait 2 minutes then shutdown
-    sleep(60);
+    rh->startWatch(poller);
+    wh->startWatch(poller);
+
+    // Set up a regular itimer interupt
+    
+    // Ignore signal in this thread
+    ::sigset_t sm;
+    ::sigemptyset(&sm);
+    ::sigaddset(&sm, SIGRTMIN);
+    ::pthread_sigmask(SIG_BLOCK, &sm, 0);
+    
+    // Signal handling
+    struct ::sigaction sa;
+    sa.sa_sigaction = timer_handler;
+    sa.sa_flags = SA_RESTART | SA_SIGINFO;
+    ::sigemptyset(&sa.sa_mask);
+    rc = ::sigaction(SIGRTMIN, &sa,0);
+    assert(rc == 0);
+    
+    ::sigevent se;
+    se.sigev_notify = SIGEV_SIGNAL;
+    se.sigev_signo = SIGRTMIN;
+    se.sigev_value.sival_ptr = 0;
+    timer_t timer;
+    rc = ::timer_create(CLOCK_REALTIME, &se, &timer);
+    assert(rc == 0);
+    
+    itimerspec ts = {
+    /*.it_value = */ {2, 0},  // s, ns
+    /*.it_interval = */ {2, 0}}; // s, ns
     
+    rc = ::timer_settime(timer, 0, &ts, 0);
+    assert(rc == 0);
+
+    // wait 2 minutes then shutdown
+    ::sleep(60);
+
+    rc = ::timer_delete(timer);
+    assert(rc == 0);
     poller->shutdown();
     dt.join();
     dt1.join();
-    //dt2.join();
-    //dt3.join();
+    dt2.join();
+    dt3.join();
 
-    cout << "Wrote: " << writtenBytes << "\n";
+    cout << "\nWrote: " << writtenBytes << "\n";
     cout << "Read: " << readBytes << "\n";
     
     return 0;

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Jan  6 15:42:18 2009
@@ -187,6 +187,13 @@
 sender_SOURCES=sender.cpp  TestOptions.h ConnectionOptions.h
 sender_LDADD=$(lib_client) 
 
+check_PROGRAMS+=PollerTest
+PollerTest_SOURCES=PollerTest.cpp
+PollerTest_LDADD=$(lib_common)
+
+check_PROGRAMS+=DispatcherTest
+DispatcherTest_SOURCES=DispatcherTest.cpp
+DispatcherTest_LDADD=$(lib_common)
 
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test 
 

Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=732177&r1=732176&r2=732177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Tue Jan  6 15:42:18 2009
@@ -23,7 +23,9 @@
  * Use socketpair to test the poller
  */
 
+#include "qpid/sys/IOHandle.h"
 #include "qpid/sys/Poller.h"
+#include "qpid/sys/posix/PrivatePosix.h"
 
 #include <string>
 #include <iostream>
@@ -67,7 +69,7 @@
     return bytesRead;
 }
 
-int main(int argc, char** argv)
+int main(int /*argc*/, char** /*argv*/)
 {
     try 
     {
@@ -103,15 +105,18 @@
 
         auto_ptr<Poller> poller(new Poller);
         
-        PollerHandle h0(sv[0]);
-        PollerHandle h1(sv[1]);
+        PosixIOHandle f0(sv[0]);
+        PosixIOHandle f1(sv[1]);
+
+        PollerHandle h0(f0);
+        PollerHandle h1(f1);
         
         poller->addFd(h0, Poller::INOUT);
         
-        // Wait for 500ms - h0 should be writable
+        // h0 should be writable
         Poller::Event event = poller->wait();
         assert(event.handle == &h0);
-        assert(event.dir == Poller::OUT);
+        assert(event.type == Poller::WRITABLE);
         
         // Write as much as we can to socket 0
         bytesWritten = writeALot(sv[0], testString);
@@ -126,17 +131,48 @@
         poller->addFd(h1, Poller::INOUT);
         event = poller->wait();
         assert(event.handle == &h1);
-        assert(event.dir == Poller::INOUT);
+        assert(event.type == Poller::READ_WRITABLE);
+        
+        // Can't interrupt, it's not active
+        assert(poller->interrupt(h1) == false);
         
         bytesRead = readALot(sv[1]);
         assert(bytesRead == bytesWritten);
         cout << "Read(1): " << bytesRead << " bytes\n";
+
+        // Test poller interrupt
+        assert(poller->interrupt(h0) == true);
+        event = poller->wait();
+        assert(event.handle == &h0);
+        assert(event.type == Poller::INTERRUPTED);
+        assert(poller->interrupt(h0) == false);
+
+        // Test multiple interrupts
+        poller->rearmFd(h0);
+        poller->rearmFd(h1);
+        assert(poller->interrupt(h0) == true);
+        assert(poller->interrupt(h1) == true);
+        
+        // Make sure we can't interrupt them again
+        assert(poller->interrupt(h0) == false);
+        assert(poller->interrupt(h1) == false);
         
+        // Make sure that they both come out in the correct order
+        event = poller->wait();
+        assert(event.handle == &h0);
+        assert(event.type == Poller::INTERRUPTED);
+        assert(poller->interrupt(h0) == false);
+        event = poller->wait();
+        assert(event.handle == &h1);
+        assert(event.type == Poller::INTERRUPTED);
+        assert(poller->interrupt(h1) == false);
+
         // At this point h1 should have been disabled from the poller
         // (as it was just returned) and h0 can write again
+        poller->rearmFd(h0);
         event = poller->wait();
         assert(event.handle == &h0);
-        assert(event.dir == Poller::OUT);    
+        assert(event.type == Poller::WRITABLE);    
 
         // Now both the handles should be disabled
         event = poller->wait(500000000);
@@ -146,11 +182,11 @@
         poller->shutdown();
         event = poller->wait();
         assert(event.handle == 0);
-        assert(event.dir == Poller::SHUTDOWN);
+        assert(event.type == Poller::SHUTDOWN);
 
         event = poller->wait();
         assert(event.handle == 0);
-        assert(event.dir == Poller::SHUTDOWN);
+        assert(event.type == Poller::SHUTDOWN);
 
         poller->delFd(h1);
         poller->delFd(h0);