You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/05/18 23:33:15 UTC

svn commit: r945899 - in /qpid/trunk/qpid/cpp/src: qpid/sys/epoll/EpollPoller.cpp tests/PollerTest.cpp

Author: astitcher
Date: Tue May 18 21:33:15 2010
New Revision: 945899

URL: http://svn.apache.org/viewvc?rev=945899&view=rev
Log:
Fix the behaviour of the EpollPoller when shutdowns and interrupts interact

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=945899&r1=945898&r2=945899&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue May 18 21:33:15 2010
@@ -536,6 +536,12 @@ Poller::Event Poller::wait(Duration time
             // Check if this is an interrupt
             PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle;
             if (dataPtr == &interruptHandle) {
+                // If we are shutting down we need to rearm the shutdown interrupt to
+                // ensure everyone still sees it. It's okay that this might be overridden
+                // below as we will be back here if it is.
+                if (impl->isShutdown) {
+                    impl->interruptAll();
+                }
                 PollerHandle* wrappedHandle = 0;
                 {
                 ScopedLock<Mutex> l(interruptHandle.impl->lock);

Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=945899&r1=945898&r2=945899&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Tue May 18 21:33:15 2010
@@ -69,20 +69,24 @@ int readALot(int fd) {
     return bytesRead;
 }
 
+void makesocketpair(int (&sv)[2]) {
+    int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
+    assert(rc >= 0);
+
+    // Set non-blocking
+    rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
+    assert(rc >= 0);
+
+    rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
+    assert(rc >= 0);
+}
+
 int main(int /*argc*/, char** /*argv*/)
 {
     try
     {
         int sv[2];
-        int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
-        assert(rc >= 0);
-
-        // Set non-blocking
-        rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
-        assert(rc >= 0);
-
-        rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
-        assert(rc >= 0);
+        makesocketpair(sv);
 
         // Make up a large string
         string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
@@ -92,16 +96,13 @@ int main(int /*argc*/, char** /*argv*/)
         // Read as much as we can from socket 0
         int bytesRead = readALot(sv[0]);
         assert(bytesRead == 0);
-        cout << "Read(0): " << bytesRead << " bytes\n";
 
         // Write as much as we can to socket 0
         int bytesWritten = writeALot(sv[0], testString);
-        cout << "Wrote(0): " << bytesWritten << " bytes\n";
 
         // Read as much as we can from socket 1
         bytesRead = readALot(sv[1]);
         assert(bytesRead == bytesWritten);
-        cout << "Read(1): " << bytesRead << " bytes\n";
 
         auto_ptr<Poller> poller(new Poller);
 
@@ -121,7 +122,6 @@ int main(int /*argc*/, char** /*argv*/)
 
         // Write as much as we can to socket 0
         bytesWritten = writeALot(sv[0], testString);
-        cout << "Wrote(0): " << bytesWritten << " bytes\n";
 
         // Wait for 500ms - h0 no longer writable
         event = poller->wait(500000000);
@@ -136,7 +136,6 @@ int main(int /*argc*/, char** /*argv*/)
 
         bytesRead = readALot(sv[1]);
         assert(bytesRead == bytesWritten);
-        cout << "Read(1): " << bytesRead << " bytes\n";
 
         // Test poller interrupt
         assert(poller->interrupt(h0) == true);
@@ -218,6 +217,43 @@ int main(int /*argc*/, char** /*argv*/)
         assert(event.handle == 0);
         assert(event.type == Poller::SHUTDOWN);
 
+        ::close(sv[0]);
+
+	// Test for correct interaction of shutdown and interrupts - need to have new poller
+	// etc. for this
+        makesocketpair(sv);
+	
+        auto_ptr<Poller> poller1(new Poller);
+
+        PosixIOHandle f2(sv[0]);
+        PosixIOHandle f3(sv[1]);
+
+        PollerHandle h2(f2);
+        PollerHandle h3(f3);
+
+        poller1->registerHandle(h2);
+        poller1->monitorHandle(h2, Poller::INOUT);
+        event = poller1->wait();
+        assert(event.handle == &h2);
+        assert(event.type == Poller::WRITABLE);
+
+	// Shutdown
+        poller1->shutdown();
+        event = poller1->wait();
+        assert(event.handle == 0);
+        assert(event.type == Poller::SHUTDOWN);
+        
+        assert(poller1->interrupt(h2) == true);
+        event = poller1->wait();
+        assert(event.handle == &h2);
+        assert(event.type == Poller::INTERRUPTED);
+        poller1->unmonitorHandle(h2, Poller::INOUT);
+
+        event = poller1->wait();
+        assert(event.handle == 0);
+        assert(event.type == Poller::SHUTDOWN);
+
+        poller1->unregisterHandle(h2);
         return 0;
     } catch (exception& e) {
         cout << "Caught exception  " << e.what() << "\n";



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org