You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/03/02 19:42:02 UTC

svn commit: r749406 - in /qpid/trunk/qpid/cpp/src: qpid/client/Connector.cpp qpid/client/SslConnector.cpp qpid/sys/DispatchHandle.cpp qpid/sys/DispatchHandle.h qpid/sys/Poller.h qpid/sys/epoll/EpollPoller.cpp tests/DispatcherTest.cpp tests/Makefile.am

Author: astitcher
Date: Mon Mar  2 18:42:02 2009
New Revision: 749406

URL: http://svn.apache.org/viewvc?rev=749406&view=rev
Log:
- Reworked DispatchHandler state machine to eliminate race conditions
  particularly when deleting a DispatchHandle
- Reworked Poller interrupt mechanism eliminating locking problems and
  to support DispatchHandler changes
- Beefed up the DispatchHandler test program so that it's a fair torture
  test of the DispatchHandler code

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
    qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
    qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=749406&r1=749405&r2=749406&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Mon Mar  2 18:42:02 2009
@@ -220,6 +220,7 @@
     bool ret = !closed;
     if (!closed) {
         closed = true;
+        aio->queueForDeletion();
         poller->shutdown();
     }
     if (!joined && receiver.id() != Thread::current().id()) {
@@ -384,14 +385,13 @@
     assert(protect);
     try {
         Dispatcher d(poller);
-	
+
         for (int i = 0; i < 32; i++) {
             aio->queueReadBuffer(new Buff(maxFrameSize));
         }
-	
+
         aio->start(poller);
         d.run();
-        aio->queueForDeletion();
         socket.close();
     } catch (const std::exception& e) {
         QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=749406&r1=749405&r2=749406&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Mon Mar  2 18:42:02 2009
@@ -221,6 +221,7 @@
     bool ret = !closed;
     if (!closed) {
         closed = true;
+        aio->queueForDeletion();
         poller->shutdown();
     }
     if (!joined && receiver.id() != Thread::current().id()) {
@@ -386,7 +387,6 @@
 	
         aio->start(poller);
         d.run();
-        aio->queueForDeletion();
         socket.close();
     } catch (const std::exception& e) {
         QPID_LOG(error, e.what());

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=749406&r1=749405&r2=749406&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Mon Mar  2 18:42:02 2009
@@ -21,6 +21,8 @@
 
 #include "DispatchHandle.h"
 
+#include <algorithm>
+
 #include <boost/cast.hpp>
 
 #include <assert.h>
@@ -29,7 +31,6 @@
 namespace sys {
 
 DispatchHandle::~DispatchHandle() {
-    stopWatch();
 }
 
 void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
@@ -37,13 +38,21 @@
     bool w = writableCallback;
 
     ScopedLock<Mutex> lock(stateLock);
-    assert(state == IDLE);
+    assert(state == IDLE || state == DELAYED_IDLE);
 
     // If no callbacks set then do nothing (that is what we were asked to do!)
     // TODO: Maybe this should be an assert instead
     if (!r && !w) {
-        state = INACTIVE;
-        return;
+        switch (state) {
+        case IDLE:
+            state = INACTIVE;
+            return;
+        case DELAYED_IDLE:
+            state = DELAYED_INACTIVE;
+            return;
+        default:
+            assert(state == IDLE || state == DELAYED_IDLE);    
+        }        
     }
 
     Poller::Direction d = r ?
@@ -53,9 +62,20 @@
     poller = poller0;
     poller->addFd(*this, d);
     
-    state = r ?
-        (w ? ACTIVE_RW : ACTIVE_R) :
-        ACTIVE_W;
+    switch (state) {
+    case IDLE:
+        state = r ?
+            (w ? ACTIVE_RW : ACTIVE_R) :
+            ACTIVE_W;
+        return;
+    case DELAYED_IDLE:
+        state = r ?
+            (w ? DELAYED_RW : DELAYED_R) :
+            DELAYED_W;
+        return;
+        default:
+            assert(state == IDLE || state == DELAYED_IDLE);            
+    }   
 }
 
 void DispatchHandle::rewatch() {
@@ -93,6 +113,8 @@
     case ACTIVE_RW:
         // Don't need to do anything already waiting for readable/writable
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }
 }
 
@@ -130,6 +152,8 @@
         poller->modFd(*this, Poller::INOUT);
         state = ACTIVE_RW;
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }
 }
 
@@ -167,6 +191,8 @@
     case ACTIVE_RW:
         // Nothing to do: already waiting for writable
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
    }
 }
 
@@ -203,6 +229,8 @@
     case ACTIVE_W:
     case INACTIVE:
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }
 }
 
@@ -239,6 +267,8 @@
     case ACTIVE_R:
     case INACTIVE:
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
    }
 }
 
@@ -261,6 +291,8 @@
         poller->modFd(*this, Poller::NONE);
         state = INACTIVE;
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }            
 }
 
@@ -280,47 +312,72 @@
     default:
         state = IDLE;
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }
     assert(poller);
     poller->delFd(*this);
     poller.reset();
 }
 
+// If we are already in the IDLE state we can't do the callback as we might
+// race to delete and callback at the same time
+// TODO: might be able to fix this by adding a new state, but would make
+// the state machine even more complex
 void DispatchHandle::call(Callback iCb) {
     assert(iCb);
     ScopedLock<Mutex> lock(stateLock);
-    interruptedCallbacks.push(iCb);
-    
-    (void) poller->interrupt(*this);
+    switch (state) {
+    case IDLE:
+    case ACTIVE_DELETE:
+        assert(false);
+        return;
+    default:
+        interruptedCallbacks.push(iCb);
+        assert(poller);
+        (void) poller->interrupt(*this);
+    }
 }
 
 // The slightly strange switch structure
 // is to ensure that the lock is released before
 // we do the delete
 void DispatchHandle::doDelete() {
-    // Ensure that we're no longer watching anything
-    stopWatch();
-
-    // If we're in the middle of a callback defer the delete
     {
     ScopedLock<Mutex> lock(stateLock);
+    // Ensure that we're no longer watching anything
     switch (state) {
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case DELAYED_INACTIVE:
+        assert(poller);
+        poller->delFd(*this);
+        poller.reset();
+        // Fallthrough
     case DELAYED_IDLE:
-    case DELAYED_DELETE:
         state = DELAYED_DELETE;
+        // Fallthrough
+    case DELAYED_DELETE:
+    case ACTIVE_DELETE:
         return;
     case IDLE:
         break;
     default:
-        // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
-        assert(false);
+        state = ACTIVE_DELETE;
+        assert(poller);
+        (void) poller->interrupt(*this);
+        poller->delFd(*this);
+        return;
     }
     }
-    // If we're not then do it right away
+    // If we're IDLE we can do this right away
     delete this;
 }
 
 void DispatchHandle::processEvent(Poller::EventType type) {
+    CallbackQueue callbacks;
+
     // Note that we are now doing the callbacks
     {
     ScopedLock<Mutex> lock(stateLock);
@@ -336,6 +393,16 @@
     case ACTIVE_RW:
         state = DELAYED_RW;
         break;
+    case ACTIVE_DELETE:
+        // Need to make sure we clean up any pending callbacks in this case
+        std::swap(callbacks, interruptedCallbacks);
+        goto saybyebye;
+    // Can get here in idle if we are stopped in a different thread
+    // just after we return with this handle in Poller::wait
+    case IDLE:
+    // Can get here in INACTIVE if a non connection thread unwatches
+    // whilst we were stuck in the above lock 
+    case INACTIVE:
     // Can only get here in a DELAYED_* state in the rare case
     // that we're already here for reading and we get activated for
     // writing and we can write (it might be possible the other way
@@ -348,9 +415,9 @@
     case DELAYED_IDLE:
     case DELAYED_DELETE:
         return;
-    default:
-        assert(false);
     }
+    
+    std::swap(callbacks, interruptedCallbacks);
     }
 
     // Do callbacks - whilst we are doing the callbacks we are prevented from processing
@@ -378,8 +445,8 @@
         break;
     case Poller::INTERRUPTED:
         {
-        ScopedLock<Mutex> lock(stateLock);
-        assert(interruptedCallbacks.size() > 0);
+        // We could only be interrupted if we also had a callback to do
+        assert(callbacks.size() > 0);
         // We'll actually do the interrupt below
         }
         break;
@@ -387,16 +454,18 @@
         assert(false);
     }
 
-    {
-    ScopedLock<Mutex> lock(stateLock);
-    // If we've got a pending interrupt do it now
-    while (interruptedCallbacks.size() > 0) {
-        Callback cb = interruptedCallbacks.front();
+    // If we have any callbacks do them now -
+    // (because we use a copy from before the previous callbacks we won't
+    //  do anything yet that was just added) 
+    while (callbacks.size() > 0) {
+        Callback cb = callbacks.front();
         assert(cb);
         cb(*this);
-        interruptedCallbacks.pop();
+        callbacks.pop();
     }
 
+    {
+    ScopedLock<Mutex> lock(stateLock);
     // If any of the callbacks re-enabled reading/writing then actually
     // do it now
     switch (state) {
@@ -425,7 +494,9 @@
     case DELAYED_DELETE:
         break;
     }
-    }      
+    }
+
+saybyebye:
     delete this;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=749406&r1=749405&r2=749406&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h Mon Mar  2 18:42:02 2009
@@ -65,6 +65,7 @@
     Mutex stateLock;
     enum {
         IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
+        ACTIVE_DELETE,
         DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
         DELAYED_DELETE
     } state;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=749406&r1=749405&r2=749406&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Mon Mar  2 18:42:02 2009
@@ -86,8 +86,9 @@
     //   with the handle and the INTERRUPTED event type
     // if it returns false then the handle is not being monitored by the poller
     // - This can either be because it has just received an event which has been
-    //   reported and has not been reenabled since. Or because it was removed
-    //   from the monitoring set
+    //   reported and has not been reenabled since.
+    // - Because it was removed from the monitoring set
+    // - Or because it is already being interrupted 
     bool interrupt(PollerHandle& handle);
     
     // Poller run loop

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=749406&r1=749405&r2=749406&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Mar  2 18:42:02 2009
@@ -54,17 +54,20 @@
         INACTIVE,
         HUNGUP,
         MONITORED_HUNGUP,
+        INTERRUPTED,
         DELETED
     };
 
     int fd;
     ::__uint32_t events;
+    PollerHandle* pollerHandle;
     FDStat stat;
     Mutex lock;
 
-    PollerHandlePrivate(int f) :
+    PollerHandlePrivate(int f, PollerHandle* p) :
       fd(f),
       events(0),
+      pollerHandle(p),
       stat(ABSENT) {
     }
 
@@ -101,6 +104,14 @@
         stat = HUNGUP;
     }
 
+    bool isInterrupted() const {
+        return stat == INTERRUPTED;
+    }
+
+    void setInterrupted() {
+        stat = INTERRUPTED;
+    }
+
     bool isDeleted() const {
         return stat == DELETED;
     }
@@ -111,7 +122,7 @@
 };
 
 PollerHandle::PollerHandle(const IOHandle& h) :
-    impl(new PollerHandlePrivate(toFd(h.impl)))
+    impl(new PollerHandlePrivate(toFd(h.impl), this))
 {}
 
 PollerHandle::~PollerHandle() {
@@ -120,6 +131,10 @@
     if (impl->isDeleted()) {
     	return;
     }
+    if (impl->isInterrupted()) {
+        impl->setDeleted();
+        return;
+    }
     if (impl->isActive()) {
         impl->setDeleted();
     }
@@ -243,23 +258,21 @@
         ::close(epollFd);
     }
     
-    void interrupt(bool all=false) {
+    void interrupt() {
 	    ::epoll_event epe;
-	    if (all) {
-	        // Not EPOLLONESHOT, so we eventually get all threads
-		    epe.events = ::EPOLLIN;
-		    epe.data.u64 = 0; // Keep valgrind happy
-	    } else {
-	    	// Use EPOLLONESHOT so we only wake a single thread
-		    epe.events = ::EPOLLIN | ::EPOLLONESHOT;
-		    epe.data.u64 = 0; // Keep valgrind happy
-		    epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);	    
-	    }
+    	// Use EPOLLONESHOT so we only wake a single thread
+	    epe.events = ::EPOLLIN | ::EPOLLONESHOT;
+	    epe.data.u64 = 0; // Keep valgrind happy
+	    epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
 	    QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));	
     }
     
     void interruptAll() {
-    	interrupt(true);
+        ::epoll_event epe;
+        // Not EPOLLONESHOT, so we eventually get all threads
+        epe.events = ::EPOLLIN;
+        epe.data.u64 = 0; // Keep valgrind happy
+        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));  
     }
 };
 
@@ -281,7 +294,7 @@
         epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
     }
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &handle;
+    epe.data.ptr = &eh;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
 
@@ -312,7 +325,7 @@
     ::epoll_event epe;
     epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &handle;
+    epe.data.ptr = &eh;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
 
@@ -329,7 +342,7 @@
     ::epoll_event epe;
     epe.events = eh.events;
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &handle;
+    epe.data.ptr = &eh;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
 
@@ -355,15 +368,14 @@
 	{
 	    PollerHandlePrivate& eh = *handle.impl;
 	    ScopedLock<Mutex> l(eh.lock);
-	    if (eh.isInactive()) {
+	    if (!eh.isActive()) {
 	    	return false;
 	    }
 	    ::epoll_event epe;
 	    epe.events = 0;
 	    epe.data.u64 = 0; // Keep valgrind happy
-	    epe.data.ptr = &eh;
 	    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-	    eh.setInactive();
+	    eh.setInterrupted();
 	}
 
 	PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
@@ -422,37 +434,54 @@
 #else
         int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask);
 #endif
-        // Check for shutdown
-        if (impl->isShutdown) {
-            PollerHandleDeletionManager.markAllUnusedInThisThread();
-            return Event(0, SHUTDOWN);
-        }
 
         if (rc ==-1 && errno != EINTR) {
             QPID_POSIX_CHECK(rc);
         } else if (rc > 0) {
             assert(rc == 1);
-            PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
+            void* dataPtr = epe.data.ptr;
 
-            PollerHandlePrivate& eh = *handle->impl;
+            // Check if this is an interrupt
+            PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle;
+            if (dataPtr == &interruptHandle) {
+                PollerHandle* wrappedHandle = 0;
+                {
+                ScopedLock<Mutex> l(interruptHandle.impl->lock);
+                if (interruptHandle.impl->isActive()) {
+                    wrappedHandle = interruptHandle.getHandle();
+                    // If there is an interrupt queued behind this one we need to arm it
+                    // We do it this way so that another thread can pick it up
+                    if (interruptHandle.queuedHandles()) {
+                        impl->interrupt();
+                        interruptHandle.impl->setActive();
+                    } else {
+                        interruptHandle.impl->setInactive();
+                    }
+                }
+                }
+                if (wrappedHandle) {
+                    ScopedLock<Mutex> l(wrappedHandle->impl->lock);
+                    if (!wrappedHandle->impl->isDeleted()) {
+                        wrappedHandle->impl->setInactive();
+                        return Event(wrappedHandle, INTERRUPTED);
+                    }
+                    PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl);
+                }
+                continue;
+            }
+
+            // Check for shutdown
+            if (impl->isShutdown) {
+                PollerHandleDeletionManager.markAllUnusedInThisThread();
+                return Event(0, SHUTDOWN);
+            }
+
+            PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr);
             ScopedLock<Mutex> l(eh.lock);
             
             // the handle could have gone inactive since we left the epoll_wait
             if (eh.isActive()) {
-
-                // Check if this is an interrupt
-                if (handle == &impl->interruptHandle) {
-                	PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();
-                	// If there is an interrupt queued behind this one we need to arm it
-                	// We do it this way so that another thread can pick it up
-                	if (impl->interruptHandle.queuedHandles()) {
-                		impl->interrupt();
-                		eh.setActive();
-                	} else {
-                		eh.setInactive();
-                	}
-                	return Event(wrappedHandle, INTERRUPTED);
-                }
+                PollerHandle* handle = eh.pollerHandle;
 
                 // If the connection has been hungup we could still be readable
                 // (just not writable), allow us to readable until we get here again

Modified: qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp?rev=749406&r1=749405&r2=749406&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp Mon Mar  2 18:42:02 2009
@@ -78,9 +78,6 @@
     h.rewatch();
 }
 
-DispatchHandle* rh = 0;
-DispatchHandle* wh = 0;
-
 void rInterrupt(DispatchHandle&) {
 	cerr << "R";
 }
@@ -92,9 +89,28 @@
 DispatchHandle::Callback rcb = rInterrupt;
 DispatchHandle::Callback wcb = wInterrupt;
 
+DispatchHandleRef *volatile rh = 0;
+DispatchHandleRef *volatile wh = 0;
+
+volatile bool stopWait = false;
+volatile bool phase1finished = false;
+
+timer_t timer;
+
+void stop_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) {
+    stopWait = true;
+}
+
 void timer_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) {
-	rh->call(rcb);
-	wh->call(wcb);
+    static int count = 0;
+    if (count++ < 10) {
+        rh->call(rcb);
+	   wh->call(wcb);
+    } else {
+        phase1finished = true;
+        int rc = ::timer_delete(timer);
+        assert(rc == 0);        
+    }
 }
 
 int main(int /*argc*/, char** /*argv*/)
@@ -132,8 +148,8 @@
     PosixIOHandle f0(sv[0]);
     PosixIOHandle f1(sv[1]);
 
-    rh = new DispatchHandle(f0, boost::bind(reader, _1, sv[0]), 0, 0);
-    wh = new DispatchHandle(f1, 0, boost::bind(writer, _1, sv[1], testString), 0);    
+    rh = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0);
+    wh = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0);    
 
     rh->startWatch(poller);
     wh->startWatch(poller);
@@ -154,11 +170,10 @@
     rc = ::sigaction(SIGRTMIN, &sa,0);
     assert(rc == 0);
     
-    ::sigevent se;
+    ::sigevent se={};
     se.sigev_notify = SIGEV_SIGNAL;
     se.sigev_signo = SIGRTMIN;
     se.sigev_value.sival_ptr = 0;
-    timer_t timer;
     rc = ::timer_create(CLOCK_REALTIME, &se, &timer);
     assert(rc == 0);
     
@@ -169,11 +184,52 @@
     rc = ::timer_settime(timer, 0, &ts, 0);
     assert(rc == 0);
 
-    // wait 2 minutes then shutdown
-    ::sleep(60);
+    // wait
+    while (!phase1finished) {
+        ::sleep(1);
+    }
+
+    // Now test deleting/creating DispatchHandles in tight loop, so that we are likely to still be using the
+    // attached PollerHandles after deleting the DispatchHandle
+    DispatchHandleRef* t = wh;
+    wh = 0;
+    delete t;
+    t = rh;
+    rh = 0;
+    delete t;
+
+    sa.sa_sigaction = stop_handler;
+    rc = ::sigaction(SIGRTMIN, &sa,0);
+    assert(rc == 0);
+    
+    itimerspec nts = {
+    /*.it_value = */ {30, 0},  // s, ns
+    /*.it_interval = */ {30, 0}}; // s, ns
+    
+    rc = ::timer_create(CLOCK_REALTIME, &se, &timer);
+    assert(rc == 0);
+    rc = ::timer_settime(timer, 0, &nts, 0);
+    assert(rc == 0);
+
+    DispatchHandleRef* rh1;
+    DispatchHandleRef* wh1;
+
+    struct timespec w = {0, 1000000};
+    while (!stopWait) {
+        rh1 = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0);
+        wh1 = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0);
+        rh1->startWatch(poller);
+        wh1->startWatch(poller);
+
+        ::nanosleep(&w, 0);
+
+        delete wh1;
+        delete rh1;
+    }
 
     rc = ::timer_delete(timer);
     assert(rc == 0);
+    
     poller->shutdown();
     dt.join();
     dt1.join();

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=749406&r1=749405&r2=749406&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon Mar  2 18:42:02 2009
@@ -201,6 +201,7 @@
 check_PROGRAMS+=DispatcherTest
 DispatcherTest_SOURCES=DispatcherTest.cpp
 DispatcherTest_LDADD=$(lib_common)
+DispatcherTest_CXXFLAGS=$(AM_CXXFLAGS) -Wno-missing-field-initializers
 
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org