You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/04 17:03:03 UTC
svn commit: r711283 -
/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Author: aconway
Date: Tue Nov 4 08:03:03 2008
New Revision: 711283
URL: http://svn.apache.org/viewvc?rev=711283&view=rev
Log:
Allow local broker to be run in any position in a ClusterFixture.
Modified:
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=711283&r1=711282&r2=711283&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Nov 4 08:03:03 2008
@@ -35,7 +35,7 @@
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/shared_ptr.hpp>
#include <string>
#include <iostream>
@@ -47,6 +47,7 @@
namespace qpid {
namespace cluster {
+// FIXME aconway 2008-11-04: remove.
Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
}} // namespace qpid::cluster
@@ -66,7 +67,7 @@
using namespace qpid::client;
using qpid::sys::TIME_SEC;
using qpid::broker::Broker;
-using boost::ptr_vector;
+using boost::shared_ptr;
using qpid::cluster::Cluster;
using qpid::cluster::getGlobalCluster;
@@ -79,24 +80,34 @@
}
/** Cluster fixture is a vector of ports for the replicas.
- * Replica 0 is in the current process, all others are forked as children.
+ *
+ * At most one replica (by default replica 0) is in the current
+ * process, all others are forked as children.
*/
-struct ClusterFixture : public vector<uint16_t> {
+class ClusterFixture : public vector<uint16_t> {
string name;
- std::auto_ptr<BrokerFixture> broker0;
- boost::ptr_vector<ForkedBroker> forkedBrokers;
- bool init0;
+ std::auto_ptr<BrokerFixture> localBroker;
+ int localIndex;
+ std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
- ClusterFixture(size_t n, bool init0=true);
+ public:
+ /** @param localIndex can be -1 meaning don't automatically start a local broker.
+ * A local broker can be started with addLocal().
+ */
+ ClusterFixture(size_t n, int localIndex=0);
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
- void add();
- void add0(bool force);
+ void add(); // Add a broker.
+ void addLocal(); // Add a local broker.
void setup();
- /** Kill a forked broker with sig, or shutdown broker0 if n==0. */
+ bool hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); }
+
+ /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
void kill(size_t n, int sig=SIGINT) {
- if (n) forkedBrokers[n-1].kill(sig);
- else broker0->broker->shutdown();
+ if (n == size_t(localIndex))
+ localBroker->broker->shutdown();
+ else
+ forkedBrokers[n]->kill(sig);
}
/** Kill a broker and suppress errors from connection. */
@@ -105,51 +116,34 @@
kill(n,sig);
try { c.close(); } catch(...) {}
}
-
- void waitFor(size_t n) {
- for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() != n; --retry)
- ::usleep(1000);
- }
};
-ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) {
+ClusterFixture::ClusterFixture(size_t n, int localIndex_) : name(Uuid(true).str()), localIndex(localIndex_) {
add(n);
- if (!init0) return; // Defer initialization of broker0
- // Wait for all n members to join the cluster
- waitFor(n);
- BOOST_REQUIRE_EQUAL(n, getGlobalCluster().getUrls().size());
}
void ClusterFixture::add() {
- std::ostringstream os;
- os << "fork" << size();
- std::string prefix = os.str();
-
- if (size()) { // Not the first broker, fork.
-
+ if (size() != size_t(localIndex)) { // fork a broker process.
+ std::ostringstream os; os << "fork" << size();
const char* argv[] = {
"qpidd " __FILE__ ,
"--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
"--auth=no", "--no-data-dir",
- "--log-prefix", prefix.c_str(),
+ "--log-prefix", os.str().c_str(),
};
size_t argc = sizeof(argv)/sizeof(argv[0]);
-
-
- forkedBrokers.push_back(new ForkedBroker(argc, argv));
- push_back(forkedBrokers.back().getPort());
+ forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(argc, argv)));
+ push_back(forkedBrokers.back()->getPort());
}
- else {
- add0(init0); // First broker, run in this process.
+ else { // Run in this process
+ addLocal();
}
}
-void ClusterFixture::add0(bool init) {
- if (!init) {
- push_back(0);
- return;
- }
+void ClusterFixture::addLocal() {
+ assert(int(size()) == localIndex || localIndex == -1);
+ localIndex = size();
const char* argv[] = {
"qpidd " __FILE__ ,
"--load-module=../.libs/cluster.so",
@@ -157,10 +151,11 @@
"--auth=no", "--no-data-dir"
};
size_t argc = sizeof(argv)/sizeof(argv[0]);
-
- qpid::log::Logger::instance().setPrefix("main");
- broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
- if (size()) front() = broker0->getPort(); else push_back(broker0->getPort());
+ ostringstream os; os << "local" << localIndex;
+ qpid::log::Logger::instance().setPrefix(os.str());
+ localBroker.reset(new BrokerFixture(parseOpts(argc, argv)));
+ push_back(localBroker->getPort());
+ forkedBrokers.push_back(shared_ptr<ForkedBroker>());
}
ostream& operator<<(ostream& o, const cpg_name* n) {
@@ -188,14 +183,15 @@
template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
vector<Url> urls = source.getKnownBrokers();
- BOOST_MESSAGE("knownBrokerPorts " << n << ": " << urls);
- if (n >= 0) {
- for (size_t retry=10; urls.size() != unsigned(n) && retry != 0; --retry) {
- ::usleep(100000);
+ if (n >= 0 && unsigned(n) != urls.size()) {
+ BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
+ // Retry up to 10 secs in .1 second intervals.
+ for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
+ ::usleep(1000*100); // 0.1 secs
urls = source.getKnownBrokers();
- BOOST_MESSAGE("knownBrokerPorts retry: " << urls);
}
}
+ BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
set<uint16_t> s;
for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
s.insert((*i)[0].get<TcpAddress>()->port);
@@ -266,7 +262,6 @@
// Add new member while there are unacked messages.
cluster.add();
- cluster.waitFor(2);
Client c1(cluster[1], "c1");
// Check queue counts
@@ -336,7 +331,7 @@
// No reliable way to ensure the partial message has arrived
// before we start the new broker, so we sleep.
- ::usleep(250);
+ ::usleep(2500);
cluster.add();
// Send final 2 frames of message.
@@ -378,7 +373,6 @@
BOOST_CHECK_EQUAL(kb2,kb1);
cluster.killWithSilencer(1,c1.connection,9);
- cluster.waitFor(2);
kb0 = knownBrokerPorts(c0.connection, 2);
kb2 = knownBrokerPorts(c2.connection, 2);
BOOST_CHECK_EQUAL(kb0.size(), 2u);
@@ -386,58 +380,54 @@
}
QPID_AUTO_TEST_CASE(DumpConsumers) {
- ClusterFixture cluster(1, false); // Don't init broker 0
+ ClusterFixture cluster(1, 1);
- cluster.add();
- Client c1(cluster[1], "c1");
- c1.session.queueDeclare("p");
- c1.session.queueDeclare("q");
- c1.subs.subscribe(c1.lq, "q", FlowControl::zero());
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("p");
+ c0.session.queueDeclare("q");
+ c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
LocalQueue lp;
- c1.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
- c1.session.sync();
+ c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
+ c0.session.sync();
// Start new members
- cluster.add0(true);
- Client c0(cluster[0], "c0");
+ cluster.add(); // Local
+ Client c1(cluster[1], "c1");
cluster.add();
Client c2(cluster[2], "c2");
// Transfer messages
- c1.session.messageTransfer(arg::content=Message("aaa", "q"));
- BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
- BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
- BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
+ c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+
+ c0.session.messageTransfer(arg::content=Message("bbb", "p"));
+ c0.session.messageTransfer(arg::content=Message("ccc", "p"));
- c1.session.messageTransfer(arg::content=Message("bbb", "p"));
- c1.session.messageTransfer(arg::content=Message("ccc", "p"));
-
// Activate the subscription, ensure message removed on all queues.
- c1.subs.setFlowControl("q", FlowControl::unlimited());
+ c0.subs.setFlowControl("q", FlowControl::unlimited());
Message m;
- BOOST_CHECK(c1.lq.get(m, TIME_SEC));
+ BOOST_CHECK(c0.lq.get(m, TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "aaa");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
- // Check second subscription's flow control: getsnn first message, not second.
+ // Check second subscription's flow control: gets first message, not second.
BOOST_CHECK(lp.get(m, TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "bbb");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
- BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "ccc");
// Kill the subscribing member, ensure further messages are not removed.
- cluster.killWithSilencer(1,c1.connection,9);
- cluster.waitFor(2);
+ cluster.killWithSilencer(0,c0.connection,9);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2);
for (int i = 0; i < 10; ++i) {
- c0.session.messageTransfer(arg::content=Message("bbb", "q"));
- BOOST_REQUIRE(c0.subs.get(m, "q", TIME_SEC));
- BOOST_REQUIRE_EQUAL(m.getData(), "bbb");
+ c1.session.messageTransfer(arg::content=Message("xxx", "q"));
+ BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
}
}
@@ -450,7 +440,7 @@
c0.session.messageTransfer(arg::content=Message("foo","q"));
c0.session.messageTransfer(arg::content=Message("bar","q"));
while (c0.session.queueQuery("q").getMessageCount() != 2)
- ::usleep(1000); // Wait for message to show up on broker 0.
+ ::usleep(1000); // Wait for message to show up on broker 0.
// Add a new broker, it should catch up.
cluster.add();
@@ -460,7 +450,7 @@
c0.session.messageTransfer(arg::content=Message("pfoo","p"));
// Do some work post-join
- cluster.waitFor(2);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2);
c0.session.messageTransfer(arg::content=Message("pbar","p"));
// Verify new brokers have state.
@@ -545,10 +535,13 @@
QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
ClusterFixture cluster(3);
- // First start a subscription.
Client c0(cluster[0]);
+ BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
+
+ // First start a subscription.
c0.session.queueDeclare("q");
c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
+
// Now send messages
Client c1(cluster[1]);
c1.session.messageTransfer(arg::content=Message("foo", "q"));