You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/24 14:37:53 UTC
svn commit: r1535355 -
/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Author: gsim
Date: Thu Oct 24 12:37:53 2013
New Revision: 1535355
URL: http://svn.apache.org/r1535355
Log:
QPID-4265: test closing of receiver with concurrent fetch
Modified:
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1535355&r1=1535354&r2=1535355&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Thu Oct 24 12:37:53 2013
@@ -1243,11 +1243,18 @@ struct Fetcher : public qpid::sys::Runna
Receiver receiver;
Message message;
bool result;
+ qpid::messaging::Duration timeout;
+ bool timedOut;
- Fetcher(Receiver r) : receiver(r), result(false) {}
+ Fetcher(Receiver r) : receiver(r), result(false), timeout(Duration::SECOND*10), timedOut(false) {}
void run()
{
- result = receiver.fetch(message, Duration::SECOND*10);
+ qpid::sys::AbsTime start(qpid::sys::now());
+ try {
+ result = receiver.fetch(message, timeout);
+ } catch (const MessagingException&) {}
+ qpid::sys::Duration timeTaken(start, qpid::sys::now());
+ timedOut = (uint64_t) timeTaken >= timeout.getMilliseconds() * qpid::sys::TIME_MSEC;
}
};
}
@@ -1387,6 +1394,44 @@ QPID_AUTO_TEST_CASE(testRollbackWithFull
txsession.commit();
}
+QPID_AUTO_TEST_CASE(testCloseAndConcurrentFetch)
+{
+ QueueFixture fix;
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Fetcher fetcher(receiver);
+ qpid::sys::Thread runner(fetcher);
+ ::usleep(500);
+ receiver.close();
+ runner.join();
+ BOOST_CHECK(!fetcher.timedOut);
+}
+
+QPID_AUTO_TEST_CASE(testCloseAndMultipleConcurrentFetches)
+{
+ QueueFixture fix;
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Receiver receiver2 = fix.session.createReceiver("amq.fanout");
+ Receiver receiver3 = fix.session.createReceiver("amq.fanout");
+ Fetcher fetcher(receiver);
+ Fetcher fetcher2(receiver2);
+ Fetcher fetcher3(receiver3);
+ qpid::sys::Thread runner(fetcher);
+ qpid::sys::Thread runner2(fetcher2);
+ qpid::sys::Thread runner3(fetcher3);
+ ::usleep(500);
+ receiver.close();
+ Message message("Test");
+ fix.session.createSender("amq.fanout").send(message);
+ runner2.join();
+ BOOST_CHECK(fetcher2.result);
+ BOOST_CHECK_EQUAL(fetcher2.message.getContent(), message.getContent());
+ runner3.join();
+ BOOST_CHECK(fetcher3.result);
+ BOOST_CHECK_EQUAL(fetcher3.message.getContent(), message.getContent());
+ runner.join();
+ BOOST_CHECK(!fetcher.timedOut);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org