You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/03/02 15:27:02 UTC
svn commit: r749313 - /qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Author: gsim
Date: Mon Mar 2 14:27:01 2009
New Revision: 749313
URL: http://svn.apache.org/viewvc?rev=749313&view=rev
Log:
QPID-1705: added unit test for heartbeat firing after connection has failed over
Modified:
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=749313&r1=749312&r2=749313&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Mar 2 14:27:01 2009
@@ -27,6 +27,7 @@
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/Session.h"
#include "qpid/client/FailoverListener.h"
+#include "qpid/client/FailoverManager.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/UpdateClient.h"
@@ -35,6 +36,8 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Logger.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
@@ -628,4 +631,86 @@
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
+QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
+{
+ struct Sender : FailoverManager::Command
+ {
+ std::string queue;
+ std::string content;
+
+ Sender(const std::string& q, const std::string& c) : queue(q), content(c) {}
+
+ void execute(AsyncSession& session, bool)
+ {
+ session.messageTransfer(arg::content=Message(content, queue));
+ }
+ };
+
+ struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable
+ {
+ FailoverManager& mgr;
+ std::string queue;
+ std::string expectedContent;
+ qpid::client::Subscription subscription;
+ qpid::sys::Monitor lock;
+ bool ready;
+
+ Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false) {}
+
+ void received(Message& message)
+ {
+ BOOST_CHECK_EQUAL(expectedContent, message.getData());
+ subscription.cancel();
+ }
+
+ void execute(AsyncSession& session, bool)
+ {
+ session.queueDeclare(arg::queue=queue);
+ SubscriptionManager subs(session);
+ subscription = subs.subscribe(*this, queue);
+ session.sync();
+ setReady();
+ subs.run();
+ //cleanup:
+ session.queueDelete(arg::queue=queue);
+ }
+
+ void run()
+ {
+ mgr.execute(*this);
+ }
+
+ void waitForReady()
+ {
+ qpid::sys::Monitor::ScopedLock l(lock);
+ while (!ready) {
+ lock.wait();
+ }
+ }
+
+ void setReady()
+ {
+ qpid::sys::Monitor::ScopedLock l(lock);
+ ready = true;
+ lock.notify();
+ }
+ };
+
+ ClusterFixture cluster(2);
+ ConnectionSettings settings;
+ settings.port = cluster[1];
+ settings.heartbeat = 1;
+ FailoverManager fmgr(settings);
+ Sender sender("my-queue", "my-data");
+ Receiver receiver(fmgr, "my-queue", "my-data");
+ qpid::sys::Thread runner(receiver);
+ receiver.waitForReady();
+ cluster.kill(1);
+ //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+ ::usleep(2*1000*1000);
+ fmgr.execute(sender);
+ runner.join();
+ fmgr.close();
+}
+
QPID_AUTO_TEST_SUITE_END()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org