You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/05/18 23:33:15 UTC
svn commit: r945899 - in /qpid/trunk/qpid/cpp/src:
qpid/sys/epoll/EpollPoller.cpp tests/PollerTest.cpp
Author: astitcher
Date: Tue May 18 21:33:15 2010
New Revision: 945899
URL: http://svn.apache.org/viewvc?rev=945899&view=rev
Log:
Fix the behaviour of the EpollPoller when shutdowns and interrupts interact
Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=945899&r1=945898&r2=945899&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue May 18 21:33:15 2010
@@ -536,6 +536,12 @@ Poller::Event Poller::wait(Duration time
// Check if this is an interrupt
PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle;
if (dataPtr == &interruptHandle) {
+ // If we are shutting down we need to rearm the shutdown interrupt to
+ // ensure everyone still sees it. It's okay that this might be overridden
+ // below as we will be back here if it is.
+ if (impl->isShutdown) {
+ impl->interruptAll();
+ }
PollerHandle* wrappedHandle = 0;
{
ScopedLock<Mutex> l(interruptHandle.impl->lock);
Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=945899&r1=945898&r2=945899&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Tue May 18 21:33:15 2010
@@ -69,20 +69,24 @@ int readALot(int fd) {
return bytesRead;
}
+void makesocketpair(int (&sv)[2]) {
+ int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
+ assert(rc >= 0);
+
+ // Set non-blocking
+ rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
+ assert(rc >= 0);
+
+ rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
+ assert(rc >= 0);
+}
+
int main(int /*argc*/, char** /*argv*/)
{
try
{
int sv[2];
- int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
- assert(rc >= 0);
-
- // Set non-blocking
- rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
- assert(rc >= 0);
-
- rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
- assert(rc >= 0);
+ makesocketpair(sv);
// Make up a large string
string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
@@ -92,16 +96,13 @@ int main(int /*argc*/, char** /*argv*/)
// Read as much as we can from socket 0
int bytesRead = readALot(sv[0]);
assert(bytesRead == 0);
- cout << "Read(0): " << bytesRead << " bytes\n";
// Write as much as we can to socket 0
int bytesWritten = writeALot(sv[0], testString);
- cout << "Wrote(0): " << bytesWritten << " bytes\n";
// Read as much as we can from socket 1
bytesRead = readALot(sv[1]);
assert(bytesRead == bytesWritten);
- cout << "Read(1): " << bytesRead << " bytes\n";
auto_ptr<Poller> poller(new Poller);
@@ -121,7 +122,6 @@ int main(int /*argc*/, char** /*argv*/)
// Write as much as we can to socket 0
bytesWritten = writeALot(sv[0], testString);
- cout << "Wrote(0): " << bytesWritten << " bytes\n";
// Wait for 500ms - h0 no longer writable
event = poller->wait(500000000);
@@ -136,7 +136,6 @@ int main(int /*argc*/, char** /*argv*/)
bytesRead = readALot(sv[1]);
assert(bytesRead == bytesWritten);
- cout << "Read(1): " << bytesRead << " bytes\n";
// Test poller interrupt
assert(poller->interrupt(h0) == true);
@@ -218,6 +217,43 @@ int main(int /*argc*/, char** /*argv*/)
assert(event.handle == 0);
assert(event.type == Poller::SHUTDOWN);
+ ::close(sv[0]);
+
+ // Test for correct interaction of shutdown and interrupts - need to have new poller
+ // etc. for this
+ makesocketpair(sv);
+
+ auto_ptr<Poller> poller1(new Poller);
+
+ PosixIOHandle f2(sv[0]);
+ PosixIOHandle f3(sv[1]);
+
+ PollerHandle h2(f2);
+ PollerHandle h3(f3);
+
+ poller1->registerHandle(h2);
+ poller1->monitorHandle(h2, Poller::INOUT);
+ event = poller1->wait();
+ assert(event.handle == &h2);
+ assert(event.type == Poller::WRITABLE);
+
+ // Shutdown
+ poller1->shutdown();
+ event = poller1->wait();
+ assert(event.handle == 0);
+ assert(event.type == Poller::SHUTDOWN);
+
+ assert(poller1->interrupt(h2) == true);
+ event = poller1->wait();
+ assert(event.handle == &h2);
+ assert(event.type == Poller::INTERRUPTED);
+ poller1->unmonitorHandle(h2, Poller::INOUT);
+
+ event = poller1->wait();
+ assert(event.handle == 0);
+ assert(event.type == Poller::SHUTDOWN);
+
+ poller1->unregisterHandle(h2);
return 0;
} catch (exception& e) {
cout << "Caught exception " << e.what() << "\n";
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org