You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/05/04 17:55:22 UTC

svn commit: r771338 - in /qpid/trunk/qpid/cpp/src: qpid/cluster/ qpid/sys/ qpid/sys/epoll/ qpid/sys/posix/ qpid/sys/windows/ tests/

Author: astitcher
Date: Mon May  4 15:55:21 2009
New Revision: 771338

URL: http://svn.apache.org/viewvc?rev=771338&view=rev
Log:
Refactored the DispatchHandle/Poller code to remove a long standing
set of race conditions.
- Changed Poller naming for better clarity with
  new semantics.
- Changed Poller semantics to avoid DispatchHandle
  keeping so much state
- Changed Poller so that it will never re-enable a
  Handle until Poller::wait is called again on the same thread
  that returned the Handle.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
    qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
    qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
    qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Mon May  4 15:55:21 2009
@@ -36,6 +36,10 @@
       )
 {}
     
+PollerDispatch::~PollerDispatch() {
+    dispatchHandle.stopWatch();
+}
+
 void PollerDispatch::start() {
     dispatchHandle.startWatch(poller);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h Mon May  4 15:55:21 2009
@@ -37,6 +37,9 @@
   public:
     PollerDispatch(Cpg&, boost::shared_ptr<sys::Poller> poller,
                    boost::function<void()> onError) ;
+
+    ~PollerDispatch();
+
     void start();
 
   private:
@@ -47,7 +50,7 @@
     Cpg& cpg;
     boost::shared_ptr<sys::Poller> poller;
     boost::function<void()> onError;
-    sys::DispatchHandle dispatchHandle;
+    sys::DispatchHandleRef dispatchHandle;
 
 
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Mon May  4 15:55:21 2009
@@ -30,6 +30,16 @@
 namespace qpid {
 namespace sys {
 
+DispatchHandle::DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+  PollerHandle(h),
+  readableCallback(rCb),
+  writableCallback(wCb),
+  disconnectedCallback(dCb),
+  state(IDLE)
+{
+}
+
+
 DispatchHandle::~DispatchHandle() {
 }
 
@@ -38,123 +48,56 @@
     bool w = writableCallback;
 
     ScopedLock<Mutex> lock(stateLock);
-    assert(state == IDLE || state == DELAYED_IDLE);
-
-    // If no callbacks set then do nothing (that is what we were asked to do!)
-    // TODO: Maybe this should be an assert instead
-    if (!r && !w) {
-        switch (state) {
-        case IDLE:
-            state = INACTIVE;
-            return;
-        case DELAYED_IDLE:
-            state = DELAYED_INACTIVE;
-            return;
-        default:
-            assert(state == IDLE || state == DELAYED_IDLE);    
-        }        
-    }
-
-    Poller::Direction d = r ?
-        (w ? Poller::INOUT : Poller::INPUT) :
-        Poller::OUTPUT;
+    assert(state == IDLE);
 
     poller = poller0;
-    poller->addFd(*this, d);
-    
-    switch (state) {
-    case IDLE:
-        state = r ?
-            (w ? ACTIVE_RW : ACTIVE_R) :
-            ACTIVE_W;
-        return;
-    case DELAYED_IDLE:
-        state = r ?
-            (w ? DELAYED_RW : DELAYED_R) :
-            DELAYED_W;
-        return;
-        default:
-            assert(state == IDLE || state == DELAYED_IDLE);            
-    }   
+    poller->registerHandle(*this);
+    state = WAITING;
+    Poller::Direction dir = r ? 
+        ( w ? Poller::INOUT : Poller::INPUT ) : 
+        ( w ? Poller::OUTPUT : Poller::NONE ); 
+    poller->monitorHandle(*this, dir);
 }
 
 void DispatchHandle::rewatch() {
     bool r = readableCallback;
     bool w = writableCallback;
+    if (!r && !w) {
+        return;
+    }
+    Poller::Direction dir = r ? 
+        ( w ? Poller::INOUT : Poller::INPUT ) : 
+        ( w ? Poller::OUTPUT : Poller::NONE ); 
 
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_INACTIVE:
-        state = r ?
-            (w ? DELAYED_RW : DELAYED_R) :
-            DELAYED_W;
-        break;
-    case DELAYED_DELETE:
-        break;
-    case INACTIVE:
-    case ACTIVE_R:
-    case ACTIVE_W: {
-        assert(poller);
-        Poller::Direction d = r ?
-            (w ? Poller::INOUT : Poller::INPUT) :
-            Poller::OUTPUT;
-        poller->modFd(*this, d);
-        state = r ?
-            (w ? ACTIVE_RW : ACTIVE_R) :
-            ACTIVE_W;
-        break;
-        }
-    case DELAYED_RW:
-    case ACTIVE_RW:
-        // Don't need to do anything already waiting for readable/writable
+    case STOPPING:
+    case DELETING:
+        return;
+    default:
         break;
-    case ACTIVE_DELETE:
-        assert(state != ACTIVE_DELETE);
     }
+    assert(poller);
+    poller->monitorHandle(*this, dir);
 }
 
 void DispatchHandle::rewatchRead() {
     if (!readableCallback) {
         return;
     }
-    
+
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_R:
-    case DELAYED_RW:
-    case DELAYED_DELETE:
-        break;
-    case DELAYED_W:
-        state = DELAYED_RW;
-        break;
-    case DELAYED_INACTIVE:
-        state = DELAYED_R;
-        break;
-    case ACTIVE_R:
-    case ACTIVE_RW:
-        // Nothing to do: already waiting for readable
-        break;
-    case INACTIVE:
-        assert(poller);
-        poller->modFd(*this, Poller::INPUT);
-        state = ACTIVE_R;
-        break;
-    case ACTIVE_W:
-        assert(poller);
-        poller->modFd(*this, Poller::INOUT);
-        state = ACTIVE_RW;
+    case STOPPING:
+    case DELETING:
+        return;
+    default:
         break;
-    case ACTIVE_DELETE:
-        assert(state != ACTIVE_DELETE);
     }
+    assert(poller);
+    poller->monitorHandle(*this, Poller::INPUT);
 }
 
 void DispatchHandle::rewatchWrite() {
@@ -165,35 +108,14 @@
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_DELETE:
-        break;
-    case DELAYED_R:
-        state = DELAYED_RW;
-        break;
-    case DELAYED_INACTIVE:
-        state = DELAYED_W;
-        break;
-    case INACTIVE:
-        assert(poller);
-        poller->modFd(*this, Poller::OUTPUT);
-        state = ACTIVE_W;
-        break;
-    case ACTIVE_R:
-        assert(poller);
-        poller->modFd(*this, Poller::INOUT);
-        state = ACTIVE_RW;
+    case STOPPING:
+    case DELETING:
+        return;
+    default:
         break;
-    case ACTIVE_W:
-    case ACTIVE_RW:
-        // Nothing to do: already waiting for writable
-        break;
-    case ACTIVE_DELETE:
-        assert(state != ACTIVE_DELETE);
-   }
+    }
+    assert(poller);
+    poller->monitorHandle(*this, Poller::OUTPUT);
 }
 
 void DispatchHandle::unwatchRead() {
@@ -204,34 +126,14 @@
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_R:
-        state = DELAYED_INACTIVE;
-        break;
-    case DELAYED_RW:
-        state = DELAYED_W;    
-        break;
-    case DELAYED_W:
-    case DELAYED_INACTIVE:
-    case DELAYED_DELETE:
-        break;
-    case ACTIVE_R:
-        assert(poller);
-        poller->modFd(*this, Poller::NONE);
-        state = INACTIVE;
-        break;
-    case ACTIVE_RW:
-        assert(poller);
-        poller->modFd(*this, Poller::OUTPUT);
-        state = ACTIVE_W;
-        break;
-    case ACTIVE_W:
-    case INACTIVE:
+    case STOPPING:
+    case DELETING:
+        return;
+    default:
         break;
-    case ACTIVE_DELETE:
-        assert(state != ACTIVE_DELETE);
     }
+    assert(poller);
+    poller->unmonitorHandle(*this, Poller::INPUT);
 }
 
 void DispatchHandle::unwatchWrite() {
@@ -242,95 +144,62 @@
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_W:
-        state = DELAYED_INACTIVE;
-        break;
-    case DELAYED_RW:
-        state = DELAYED_R;
-        break;
-    case DELAYED_R:
-    case DELAYED_INACTIVE:
-    case DELAYED_DELETE:
-        break;
-    case ACTIVE_W:
-        assert(poller);
-        poller->modFd(*this, Poller::NONE);
-        state = INACTIVE;
-        break;
-    case ACTIVE_RW:
-        assert(poller);
-        poller->modFd(*this, Poller::INPUT);
-        state = ACTIVE_R;
-        break;
-    case ACTIVE_R:
-    case INACTIVE:
+    case STOPPING:
+    case DELETING:
+        return;
+    default:
         break;
-    case ACTIVE_DELETE:
-        assert(state != ACTIVE_DELETE);
-   }
+    }
+    assert(poller);
+    poller->unmonitorHandle(*this, Poller::OUTPUT);
 }
 
 void DispatchHandle::unwatch() {
     ScopedLock<Mutex> lock(stateLock);
-    switch (state) {
+    switch(state) {
     case IDLE:
-    case DELAYED_IDLE:
-        break;
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_INACTIVE:
-        state = DELAYED_INACTIVE;
-        break;
-    case DELAYED_DELETE:
-        break;
+    case STOPPING:
+    case DELETING:
+        return;
     default:
-        assert(poller);
-        poller->modFd(*this, Poller::NONE);
-        state = INACTIVE;
         break;
-    case ACTIVE_DELETE:
-        assert(state != ACTIVE_DELETE);
-    }            
+    }
+    assert(poller);
+    poller->unmonitorHandle(*this, Poller::INOUT);
 }
 
 void DispatchHandle::stopWatch() {
     ScopedLock<Mutex> lock(stateLock);
     switch (state) {
     case IDLE:
-    case DELAYED_IDLE:
-    case DELAYED_DELETE:
+        assert(state != IDLE);
+        return;
+    case STOPPING:
+        assert(state != STOPPING);
         return;
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_INACTIVE:
-        state = DELAYED_IDLE;
+    case CALLING:
+        state = STOPPING;
         break;
-    default:
+    case WAITING:
         state = IDLE;
         break;
-    case ACTIVE_DELETE:
-        assert(state != ACTIVE_DELETE);
+    case DELETING:
+        return;
     }
     assert(poller);
-    poller->delFd(*this);
+    poller->unregisterHandle(*this);
     poller.reset();
 }
 
-// If we are already in the IDLE state we can't do the callback as we might
-// race to delete and callback at the same time
-// TODO: might be able to fix this by adding a new state, but would make
-// the state machine even more complex
+// If we are in the IDLE/STOPPING state we can't do the callback as we've
+// not/no longer got the fd registered in any poller
 void DispatchHandle::call(Callback iCb) {
     assert(iCb);
     ScopedLock<Mutex> lock(stateLock);
     switch (state) {
     case IDLE:
-    case ACTIVE_DELETE:
-        assert(false);
+    case STOPPING:
+    case DELETING:
         return;
     default:
         interruptedCallbacks.push(iCb);
@@ -347,27 +216,24 @@
     ScopedLock<Mutex> lock(stateLock);
     // Ensure that we're no longer watching anything
     switch (state) {
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_INACTIVE:
-        assert(poller);
-        poller->delFd(*this);
-        poller.reset();
-        // Fallthrough
-    case DELAYED_IDLE:
-        state = DELAYED_DELETE;
-        // Fallthrough
-    case DELAYED_DELETE:
-    case ACTIVE_DELETE:
-        return;
     case IDLE:
+        state = DELETING;
         break;
-    default:
-        state = ACTIVE_DELETE;
+    case STOPPING:
+        state = DELETING;
+        return;
+    case WAITING:
+        state = DELETING;
         assert(poller);
         (void) poller->interrupt(*this);
-        poller->delFd(*this);
+        poller->unregisterHandle(*this);
+        return;
+    case CALLING:
+        state = DELETING;
+        assert(poller);
+        poller->unregisterHandle(*this);
+        return;
+    case DELETING:
         return;
     }
     }
@@ -378,43 +244,28 @@
 void DispatchHandle::processEvent(Poller::EventType type) {
     CallbackQueue callbacks;
 
-    // Note that we are now doing the callbacks
+    // Phase I
     {
     ScopedLock<Mutex> lock(stateLock);
     
-    // Set up to wait for same events next time unless reset
     switch(state) {
-    case ACTIVE_R:
-        state = DELAYED_R;
-        break;
-    case ACTIVE_W:
-        state = DELAYED_W;
-        break;
-    case ACTIVE_RW:
-        state = DELAYED_RW;
+    case IDLE:
+        // Can get here if a non connection thread stops watching
+        // whilst we were stuck in the above lock 
+        return;
+    case WAITING:
+        state = CALLING;
         break;
-    case ACTIVE_DELETE:
+    case CALLING:
+        assert(state!=CALLING);
+        return;
+    case STOPPING:
+        assert(state!=STOPPING);
+        return;
+    case DELETING:
         // Need to make sure we clean up any pending callbacks in this case
         std::swap(callbacks, interruptedCallbacks);
         goto saybyebye;
-    // Can get here in idle if we are stopped in a different thread
-    // just after we return with this handle in Poller::wait
-    case IDLE:
-    // Can get here in INACTIVE if a non connection thread unwatches
-    // whilst we were stuck in the above lock 
-    case INACTIVE:
-    // Can only get here in a DELAYED_* state in the rare case
-    // that we're already here for reading and we get activated for
-    // writing and we can write (it might be possible the other way
-    // round too). In this case we're already processing the handle
-    // in a different thread in this function so return right away
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_RW:
-    case DELAYED_INACTIVE:
-    case DELAYED_IDLE:
-    case DELAYED_DELETE:
-        return;
     }
     
     std::swap(callbacks, interruptedCallbacks);
@@ -434,10 +285,9 @@
         readableCallback(*this);
         writableCallback(*this);
         break;
-    case Poller::DISCONNECTED:
-        {
+    case Poller::DISCONNECTED: {
         ScopedLock<Mutex> lock(stateLock);
-        state = DELAYED_INACTIVE;
+        poller->unmonitorHandle(*this, Poller::INOUT);
         }
         if (disconnectedCallback) {
             disconnectedCallback(*this);
@@ -466,32 +316,20 @@
 
     {
     ScopedLock<Mutex> lock(stateLock);
-    // If any of the callbacks re-enabled reading/writing then actually
-    // do it now
     switch (state) {
-    case DELAYED_R:
-        poller->modFd(*this, Poller::INPUT);
-        state = ACTIVE_R;
-        return;
-    case DELAYED_W:
-        poller->modFd(*this, Poller::OUTPUT);
-        state = ACTIVE_W;
-        return;
-    case DELAYED_RW:
-        poller->modFd(*this, Poller::INOUT);
-        state = ACTIVE_RW;
-        return;
-    case DELAYED_INACTIVE:
-        state = INACTIVE;
-        return;
-    case DELAYED_IDLE:
-    	state = IDLE;
-    	return;
-    default:
-        // This should be impossible
-        assert(false);
+    case IDLE:
+        assert(state!=IDLE);
+        return;
+    case STOPPING:
+        state = IDLE;
+        return;
+    case WAITING:
+        assert(state!=WAITING);
+        return;
+    case CALLING:
+        state = WAITING;
         return;
-    case DELAYED_DELETE:
+    case DELETING:
         break;
     }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h Mon May  4 15:55:21 2009
@@ -64,10 +64,11 @@
     Poller::shared_ptr poller;
     Mutex stateLock;
     enum {
-        IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
-        ACTIVE_DELETE,
-        DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
-        DELAYED_DELETE
+        IDLE,
+        STOPPING,
+        WAITING,
+        CALLING,
+        DELETING
     } state;
 
 public:
@@ -83,14 +84,7 @@
      *@param wCb Callback called when the handle is writable.
      *@param dCb Callback called when the handle is disconnected.
      */
-    QPID_COMMON_EXTERN DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
-      PollerHandle(h),
-      readableCallback(rCb),
-      writableCallback(wCb),
-      disconnectedCallback(dCb),
-      state(IDLE)
-    {}
-
+    QPID_COMMON_EXTERN DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb);
     QPID_COMMON_EXTERN ~DispatchHandle();
 
     /** Add this DispatchHandle to the poller to be watched. */
@@ -122,7 +116,6 @@
     QPID_COMMON_EXTERN void call(Callback iCb);
 
 protected:
-    /** Override to get extra processing done when the DispatchHandle is deleted. */
     QPID_COMMON_EXTERN void doDelete();
 
 private:

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Mon May  4 15:55:21 2009
@@ -94,10 +94,10 @@
     // Poller run loop
     QPID_COMMON_EXTERN void run();
 
-    QPID_COMMON_EXTERN void addFd(PollerHandle& handle, Direction dir);
-    QPID_COMMON_EXTERN void delFd(PollerHandle& handle);
-    QPID_COMMON_EXTERN void modFd(PollerHandle& handle, Direction dir);
-    QPID_COMMON_EXTERN void rearmFd(PollerHandle& handle);
+    QPID_COMMON_EXTERN void registerHandle(PollerHandle& handle);
+    QPID_COMMON_EXTERN void unregisterHandle(PollerHandle& handle);
+    QPID_COMMON_EXTERN void monitorHandle(PollerHandle& handle, Direction dir);
+    QPID_COMMON_EXTERN void unmonitorHandle(PollerHandle& handle, Direction dir);
     QPID_COMMON_EXTERN Event wait(Duration timeout = TIME_INFINITE);
 };
 
@@ -108,6 +108,7 @@
 class PollerHandlePrivate;
 class PollerHandle {
     friend class Poller;
+    friend class PollerPrivate;
     friend struct Poller::Event;
 
     PollerHandlePrivate* const impl;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon May  4 15:55:21 2009
@@ -46,6 +46,7 @@
 
 class PollerHandlePrivate {
     friend class Poller;
+    friend class PollerPrivate;
     friend class PollerHandle;
 
     enum FDStat {
@@ -55,6 +56,7 @@
         HUNGUP,
         MONITORED_HUNGUP,
         INTERRUPTED,
+        INTERRUPTED_HUNGUP,
         DELETED
     };
 
@@ -76,7 +78,9 @@
     }
 
     void setActive() {
-        stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED;
+        stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP) 
+            ? MONITORED_HUNGUP
+            : MONITORED;
     }
 
     bool isInactive() const {
@@ -96,7 +100,10 @@
     }
 
     bool isHungup() const {
-        return stat == MONITORED_HUNGUP || stat == HUNGUP;
+        return
+            stat == MONITORED_HUNGUP ||
+            stat == HUNGUP ||
+            stat == INTERRUPTED_HUNGUP;
     }
 
     void setHungup() {
@@ -105,11 +112,13 @@
     }
 
     bool isInterrupted() const {
-        return stat == INTERRUPTED;
+        return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP;
     }
 
     void setInterrupted() {
-        stat = INTERRUPTED;
+        stat = (stat == MONITORED_HUNGUP || stat == HUNGUP)
+            ? INTERRUPTED_HUNGUP
+            : INTERRUPTED;
     }
 
     bool isDeleted() const {
@@ -131,13 +140,13 @@
     if (impl->isDeleted()) {
     	return;
     }
+    impl->pollerHandle = 0;
     if (impl->isInterrupted()) {
         impl->setDeleted();
         return;
     }
-    if (impl->isActive()) {
-        impl->setDeleted();
-    }
+    assert(impl->isIdle());
+    impl->setDeleted();
     }
     PollerHandleDeletionManager.markForDeletion(impl);
 }
@@ -256,8 +265,13 @@
     ~PollerPrivate() {
         // It's probably okay to ignore any errors here as there can't be data loss
         ::close(epollFd);
+        
+        // Need to put the interruptHandle in idle state to delete it
+        static_cast<PollerHandle&>(interruptHandle).impl->setIdle();
     }
-    
+
+    void resetMode(PollerHandlePrivate& handle);
+
     void interrupt() {
 	    ::epoll_event epe;
     	// Use EPOLLONESHOT so we only wake a single thread
@@ -279,74 +293,122 @@
 PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
 int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
 
-void Poller::addFd(PollerHandle& handle, Direction dir) {
+void Poller::registerHandle(PollerHandle& handle) {
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
-    ::epoll_event epe;
-    int op;
+    assert(eh.isIdle());
 
-    if (eh.isIdle()) {
-        op = EPOLL_CTL_ADD;
-        epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
-    } else {
-        assert(eh.isActive());
-        op = EPOLL_CTL_MOD;
-        epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
-    }
+    ::epoll_event epe;
+    epe.events = ::EPOLLONESHOT;
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
 
-    // Record monitoring state of this fd
-    eh.events = epe.events;
     eh.setActive();
 }
 
-void Poller::delFd(PollerHandle& handle) {
+void Poller::unregisterHandle(PollerHandle& handle) {
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
     assert(!eh.isIdle());
+
     int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
     // Ignore EBADF since deleting a nonexistent fd has the overall required result!
     // And allows the case where a sloppy program closes the fd and then does the delFd()
     if (rc == -1 && errno != EBADF) {
 	    QPID_POSIX_CHECK(rc);
     }
+
     eh.setIdle();
 }
 
-// modFd is equivalent to delFd followed by addFd
-void Poller::modFd(PollerHandle& handle, Direction dir) {
+void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
+    PollerHandle* ph;
+    {
+    ScopedLock<Mutex> l(eh.lock);
+    assert(!eh.isActive());
+
+    if (eh.isIdle() || eh.isDeleted()) {
+        return;
+    }
+    
+    if (eh.events==0) {
+        eh.setActive();
+        return;        
+    }
+
+    if (!eh.isInterrupted()) {
+        ::epoll_event epe;
+        epe.events = eh.events | ::EPOLLONESHOT;
+        epe.data.u64 = 0; // Keep valgrind happy
+        epe.data.ptr = &eh;
+    
+        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+    
+        eh.setActive();
+        return;        
+    }
+    ph = eh.pollerHandle;
+    }
+
+    PollerHandlePrivate& ihp = *static_cast<PollerHandle&>(interruptHandle).impl;
+    ScopedLock<Mutex> l(ihp.lock);
+    interruptHandle.addHandle(*ph);
+    ihp.setActive();
+    interrupt();
+}
+
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
     assert(!eh.isIdle());
 
+    ::__uint32_t oldEvents = eh.events;
+    eh.events |= PollerPrivate::directionToEpollEvent(dir);
+    
+    // If no change nothing more to do - avoid unnecessary system call
+    if (oldEvents==eh.events) {
+        return;
+    }
+
+    // If we're not actually listening wait till we are to perform change
+    if (!eh.isActive()) {
+        return;
+    }
+    
     ::epoll_event epe;
-    epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+    epe.events = eh.events | ::EPOLLONESHOT;
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
-    // Record monitoring state of this fd
-    eh.events = epe.events;
-    eh.setActive();
 }
 
-void Poller::rearmFd(PollerHandle& handle) {
+void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
-    assert(eh.isInactive());
+    assert(!eh.isIdle());
 
+    ::__uint32_t oldEvents = eh.events;
+    eh.events &= ~PollerPrivate::directionToEpollEvent(dir);
+
+    // If no change nothing more to do - avoid unnecessary system call
+    if (oldEvents==eh.events) {
+        return;
+    }
+    
+    // If we're not actually listening wait till we are to perform change
+    if (!eh.isActive()) {
+        return;
+    }
+    
     ::epoll_event epe;
-    epe.events = eh.events;
+    epe.events = eh.events | ::EPOLLONESHOT;
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
-    eh.setActive();
 }
 
 void Poller::shutdown() {
@@ -368,14 +430,25 @@
 	{
 	    PollerHandlePrivate& eh = *handle.impl;
 	    ScopedLock<Mutex> l(eh.lock);
-	    if (!eh.isActive()) {
+	    if (eh.isIdle() || eh.isDeleted()) {
 	    	return false;
 	    }
+        
+        if (eh.isInterrupted()) {
+            return true;
+        }
+        
+        // Stop monitoring handle for read or write
 	    ::epoll_event epe;
 	    epe.events = 0;
 	    epe.data.u64 = 0; // Keep valgrind happy
 	    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-	    eh.setInterrupted();
+
+        if (eh.isInactive()) {
+            eh.setInterrupted();
+            return true;
+        }
+        eh.setInterrupted();
 	}
 
 	PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
@@ -414,6 +487,7 @@
 }
 
 Poller::Event Poller::wait(Duration timeout) {
+    static __thread PollerHandlePrivate* lastReturnedHandle = 0;
     epoll_event epe;
     int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
     AbsTime targetTimeout = 
@@ -421,6 +495,11 @@
             FAR_FUTURE :
             AbsTime(now(), timeout); 
 
+    if (lastReturnedHandle) {
+        impl->resetMode(*lastReturnedHandle);
+        lastReturnedHandle = 0;
+    }
+
     // Repeat until we weren't interrupted by signal
     do {
         PollerHandleDeletionManager.markAllUnusedInThisThread();
@@ -460,12 +539,19 @@
                 }
                 }
                 if (wrappedHandle) {
-                    ScopedLock<Mutex> l(wrappedHandle->impl->lock);
-                    if (!wrappedHandle->impl->isDeleted()) {
-                        wrappedHandle->impl->setInactive();
+                    PollerHandlePrivate& eh = *wrappedHandle->impl;
+                    {
+                    ScopedLock<Mutex> l(eh.lock);
+                    if (!eh.isDeleted()) {
+                        if (!eh.isIdle()) {
+                            eh.setInactive();
+                        }
+                        lastReturnedHandle = &eh;
+                        assert(eh.pollerHandle == wrappedHandle);
                         return Event(wrappedHandle, INTERRUPTED);
                     }
-                    PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl);
+                    }
+                    PollerHandleDeletionManager.markForDeletion(&eh);
                 }
                 continue;
             }
@@ -482,6 +568,7 @@
             // the handle could have gone inactive since we left the epoll_wait
             if (eh.isActive()) {
                 PollerHandle* handle = eh.pollerHandle;
+                assert(handle);
 
                 // If the connection has been hungup we could still be readable
                 // (just not writable), allow us to readable until we get here again
@@ -493,15 +580,8 @@
                 } else {
                     eh.setInactive();
                 }
+                lastReturnedHandle = &eh;
                 return Event(handle, PollerPrivate::epollToDirection(epe.events));
-            } else if (eh.isDeleted()) {
-                // The handle has been deleted whilst still active and so must be removed
-                // from the poller
-                int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
-                // Ignore EBADF since it's quite likely that we could race with closing the fd
-                if (rc == -1 && errno != EBADF) {
-                    QPID_POSIX_CHECK(rc);
-                }
             }
         }
         // We only get here if one of the following:

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Mon May  4 15:55:21 2009
@@ -75,6 +75,7 @@
 class AsynchAcceptorPrivate {
 public:
     AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
+    ~AsynchAcceptorPrivate();
     void start(Poller::shared_ptr poller);
 
 private:
@@ -109,6 +110,10 @@
     s.setNonblocking();
 }
 
+AsynchAcceptorPrivate::~AsynchAcceptorPrivate() {
+    handle.stopWatch();
+}
+
 void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
     handle.startWatch(poller);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon May  4 15:55:21 2009
@@ -146,7 +146,7 @@
 }
 
 void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
-    poller->addFd(PollerHandle(socket), Poller::INPUT);
+    poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
     restart ();
 }
 
@@ -426,7 +426,7 @@
 
 void AsynchIO::start(Poller::shared_ptr poller0) {
     poller = poller0;
-    poller->addFd(PollerHandle(socket), Poller::INPUT);
+    poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
     if (writeQueue.size() > 0)  // Already have data queued for write
         notifyPendingWrite();
     startReading();
@@ -471,7 +471,7 @@
                              boost::bind(&AsynchIO::completion, this, _1));
     IOHandle h(hp);
     PollerHandle ph(h);
-    poller->addFd(ph, Poller::OUTPUT);
+    poller->monitorHandle(ph, Poller::OUTPUT);
 }
 
 void AsynchIO::queueWriteClose() {
@@ -559,7 +559,7 @@
                              callback);
     IOHandle h(hp);
     PollerHandle ph(h);
-    poller->addFd(ph, Poller::INPUT);
+    poller->monitorHandle(ph, Poller::INPUT);
 }
 
 /**

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp Mon May  4 15:55:21 2009
@@ -122,7 +122,7 @@
     } while (true);
 }
 
-void Poller::addFd(PollerHandle& handle, Direction dir) {
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
     HANDLE h = (HANDLE)(handle.impl->fd);
     if (h != INVALID_HANDLE_VALUE) {
         HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0);
@@ -146,9 +146,9 @@
 }
 
 // All no-ops...
-void Poller::delFd(PollerHandle& handle) {}
-void Poller::modFd(PollerHandle& handle, Direction dir) {}
-void Poller::rearmFd(PollerHandle& handle) {}
+void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {}
+void Poller::registerHandle(PollerHandle& handle) {}
+void Poller::unregisterHandle(PollerHandle& handle) {}
 
 Poller::Event Poller::wait(Duration timeout) {
     DWORD timeoutMs = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp Mon May  4 15:55:21 2009
@@ -76,15 +76,15 @@
     if (!armed)
         return;
 
-    // addFd will queue a completion for the IOCP; when it's handled, a
+    // monitorHandle will queue a completion for the IOCP; when it's handled, a
     // poller thread will call back to dispatch() below.
     PollerHandle ph(*this);
-    poller->addFd(ph, Poller::INPUT);
+    poller->monitorHandle(ph, Poller::INPUT);
 }
 
 void PollableConditionPrivate::dispatch(AsynchIoResult *result)
 {
-    delete result;       // Poller::addFd() allocates this
+    delete result;       // Poller::monitorHandle() allocates this
     cb(parent);
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Mon May  4 15:55:21 2009
@@ -111,7 +111,8 @@
         PollerHandle h0(f0);
         PollerHandle h1(f1);
         
-        poller->addFd(h0, Poller::INOUT);
+        poller->registerHandle(h0);
+        poller->monitorHandle(h0, Poller::INOUT);
         
         // h0 should be writable
         Poller::Event event = poller->wait();
@@ -123,19 +124,16 @@
         cout << "Wrote(0): " << bytesWritten << " bytes\n";
         
         // Wait for 500ms - h0 no longer writable
-        poller->rearmFd(h0);
         event = poller->wait(500000000);
         assert(event.handle == 0);
 
         // Test we can read it all now
-        poller->addFd(h1, Poller::INOUT);
+        poller->registerHandle(h1);
+        poller->monitorHandle(h1, Poller::INOUT);
         event = poller->wait();
         assert(event.handle == &h1);
         assert(event.type == Poller::READ_WRITABLE);
         
-        // Can't interrupt, it's not active
-        assert(poller->interrupt(h1) == false);
-        
         bytesRead = readALot(sv[1]);
         assert(bytesRead == bytesWritten);
         cout << "Read(1): " << bytesRead << " bytes\n";
@@ -145,39 +143,52 @@
         event = poller->wait();
         assert(event.handle == &h0);
         assert(event.type == Poller::INTERRUPTED);
-        assert(poller->interrupt(h0) == false);
 
         // Test multiple interrupts
-        poller->rearmFd(h0);
-        poller->rearmFd(h1);
         assert(poller->interrupt(h0) == true);
         assert(poller->interrupt(h1) == true);
         
-        // Make sure we can't interrupt them again
-        assert(poller->interrupt(h0) == false);
-        assert(poller->interrupt(h1) == false);
+        // Make sure we can interrupt them again
+        assert(poller->interrupt(h0) == true);
+        assert(poller->interrupt(h1) == true);
         
-        // Make sure that they both come out in the correct order
+        // Make sure that they both come out
         event = poller->wait();
-        assert(event.handle == &h0);
         assert(event.type == Poller::INTERRUPTED);
-        assert(poller->interrupt(h0) == false);
+        assert(event.handle == &h0 || event.handle == &h1);
+        if (event.handle == &h0) {
+            event = poller->wait();
+            assert(event.type == Poller::INTERRUPTED);
+            assert(event.handle == &h1);
+        } else {
+            event = poller->wait();
+            assert(event.type == Poller::INTERRUPTED);
+            assert(event.handle == &h0);
+        }
+
+        poller->unmonitorHandle(h1, Poller::INOUT);
+
         event = poller->wait();
-        assert(event.handle == &h1);
-        assert(event.type == Poller::INTERRUPTED);
-        assert(poller->interrupt(h1) == false);
+        assert(event.handle == &h0);
+        assert(event.type == Poller::WRITABLE);    
 
-        // At this point h1 should have been disabled from the poller
-        // (as it was just returned) and h0 can write again
-        poller->rearmFd(h0);
+        // We didn't write anything so it should still be writable
         event = poller->wait();
         assert(event.handle == &h0);
         assert(event.type == Poller::WRITABLE);    
 
-        // Now both the handles should be disabled
+        poller->unmonitorHandle(h0, Poller::INOUT);
+
         event = poller->wait(500000000);
         assert(event.handle == 0);
         
+        poller->unregisterHandle(h1);
+        poller->unregisterHandle(h0);
+        
+        // Make sure we can't interrupt them now
+        assert(poller->interrupt(h0) == false);
+        assert(poller->interrupt(h1) == false);
+        
         // Test shutdown
         poller->shutdown();
         event = poller->wait();
@@ -188,9 +199,6 @@
         assert(event.handle == 0);
         assert(event.type == Poller::SHUTDOWN);
 
-        poller->delFd(h1);
-        poller->delFd(h0);
-        
         return 0;
     } catch (exception& e) {
         cout << "Caught exception  " << e.what() << "\n";



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org