You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/24 14:37:53 UTC

svn commit: r1535355 - /qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Author: gsim
Date: Thu Oct 24 12:37:53 2013
New Revision: 1535355

URL: http://svn.apache.org/r1535355
Log:
QPID-4265: test closing of receiver with concurrent fetch

Modified:
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1535355&r1=1535354&r2=1535355&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Thu Oct 24 12:37:53 2013
@@ -1243,11 +1243,18 @@ struct Fetcher : public qpid::sys::Runna
     Receiver receiver;
     Message message;
     bool result;
+    qpid::messaging::Duration timeout;
+    bool timedOut;
 
-    Fetcher(Receiver r) : receiver(r), result(false) {}
+    Fetcher(Receiver r) : receiver(r), result(false), timeout(Duration::SECOND*10), timedOut(false) {}
     void run()
     {
-        result = receiver.fetch(message, Duration::SECOND*10);
+        qpid::sys::AbsTime start(qpid::sys::now());
+        try {
+            result = receiver.fetch(message, timeout);
+        } catch (const MessagingException&) {}
+        qpid::sys::Duration timeTaken(start, qpid::sys::now());
+        timedOut = (uint64_t) timeTaken >= timeout.getMilliseconds() * qpid::sys::TIME_MSEC;
     }
 };
 }
@@ -1387,6 +1394,44 @@ QPID_AUTO_TEST_CASE(testRollbackWithFull
     txsession.commit();
 }
 
+QPID_AUTO_TEST_CASE(testCloseAndConcurrentFetch)
+{
+    QueueFixture fix;
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Fetcher fetcher(receiver);
+    qpid::sys::Thread runner(fetcher);
+    ::usleep(500);
+    receiver.close();
+    runner.join();
+    BOOST_CHECK(!fetcher.timedOut);
+}
+
+QPID_AUTO_TEST_CASE(testCloseAndMultipleConcurrentFetches)
+{
+    QueueFixture fix;
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    Receiver receiver2 = fix.session.createReceiver("amq.fanout");
+    Receiver receiver3 = fix.session.createReceiver("amq.fanout");
+    Fetcher fetcher(receiver);
+    Fetcher fetcher2(receiver2);
+    Fetcher fetcher3(receiver3);
+    qpid::sys::Thread runner(fetcher);
+    qpid::sys::Thread runner2(fetcher2);
+    qpid::sys::Thread runner3(fetcher3);
+    ::usleep(500);
+    receiver.close();
+    Message message("Test");
+    fix.session.createSender("amq.fanout").send(message);
+    runner2.join();
+    BOOST_CHECK(fetcher2.result);
+    BOOST_CHECK_EQUAL(fetcher2.message.getContent(), message.getContent());
+    runner3.join();
+    BOOST_CHECK(fetcher3.result);
+    BOOST_CHECK_EQUAL(fetcher3.message.getContent(), message.getContent());
+    runner.join();
+    BOOST_CHECK(!fetcher.timedOut);
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org