You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/04 17:03:03 UTC

svn commit: r711283 - /incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Author: aconway
Date: Tue Nov  4 08:03:03 2008
New Revision: 711283

URL: http://svn.apache.org/viewvc?rev=711283&view=rev
Log:
Allow local broker to be run in any position in a ClusterFixture.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=711283&r1=711282&r2=711283&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Nov  4 08:03:03 2008
@@ -35,7 +35,7 @@
 #include "qpid/log/Logger.h"
 
 #include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/shared_ptr.hpp>
 
 #include <string>
 #include <iostream>
@@ -47,6 +47,7 @@
 
 namespace qpid {
 namespace cluster {
+// FIXME aconway 2008-11-04: remove.
 Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
 }} // namespace qpid::cluster
 
@@ -66,7 +67,7 @@
 using namespace qpid::client;
 using qpid::sys::TIME_SEC;
 using qpid::broker::Broker;
-using boost::ptr_vector;
+using boost::shared_ptr;
 using qpid::cluster::Cluster;
 using qpid::cluster::getGlobalCluster;
 
@@ -79,24 +80,34 @@
 }
 
 /** Cluster fixture is a vector of ports for the replicas.
- * Replica 0 is in the current process, all others are forked as children.
+ * 
+ * At most one replica (by default replica 0) is in the current
+ * process, all others are forked as children.
  */
-struct ClusterFixture : public vector<uint16_t>  {
+class ClusterFixture : public vector<uint16_t>  {
     string name;
-    std::auto_ptr<BrokerFixture> broker0;
-    boost::ptr_vector<ForkedBroker> forkedBrokers;
-    bool init0;
+    std::auto_ptr<BrokerFixture> localBroker;
+    int localIndex;
+    std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
 
-    ClusterFixture(size_t n, bool init0=true);
+  public:
+    /** @param localIndex can be -1 meaning don't automatically start a local broker.
+     * A local broker can be started with addLocal().
+     */
+    ClusterFixture(size_t n, int localIndex=0);
     void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
-    void add();
-    void add0(bool force);
+    void add();                 // Add a broker.
+    void addLocal();            // Add a local broker.
     void setup();
 
-    /** Kill a forked broker with sig, or shutdown broker0 if n==0. */
+    bool hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); }
+    
+    /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
     void kill(size_t n, int sig=SIGINT) {
-        if (n) forkedBrokers[n-1].kill(sig);
-        else broker0->broker->shutdown();
+        if (n == size_t(localIndex))
+            localBroker->broker->shutdown();
+        else
+            forkedBrokers[n]->kill(sig);
     }
 
     /** Kill a broker and suppress errors from connection. */
@@ -105,51 +116,34 @@
         kill(n,sig);
         try { c.close(); } catch(...) {}
     }
-
-    void waitFor(size_t n) {
-        for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() != n; --retry)
-            ::usleep(1000);
-    }
 };
 
-ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) {
+ClusterFixture::ClusterFixture(size_t n, int localIndex_) : name(Uuid(true).str()), localIndex(localIndex_) {
     add(n);
-    if (!init0) return;  // Defer initialization of broker0
-    // Wait for all n members to join the cluster
-    waitFor(n);
-    BOOST_REQUIRE_EQUAL(n, getGlobalCluster().getUrls().size());
 }
 
 void ClusterFixture::add() {
-    std::ostringstream os;
-    os << "fork" << size();
-    std::string prefix = os.str();
-
-    if (size())  {              // Not the first broker, fork.
-
+    if (size() != size_t(localIndex))  { // fork a broker process.
+        std::ostringstream os; os << "fork" << size();
         const char* argv[] = {
             "qpidd " __FILE__ ,
             "--load-module=../.libs/cluster.so",
             "--cluster-name", name.c_str(), 
             "--auth=no", "--no-data-dir",
-            "--log-prefix", prefix.c_str(),
+            "--log-prefix", os.str().c_str(),
         };
         size_t argc = sizeof(argv)/sizeof(argv[0]);
-
-
-        forkedBrokers.push_back(new ForkedBroker(argc, argv));
-        push_back(forkedBrokers.back().getPort());
+        forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(argc, argv)));
+        push_back(forkedBrokers.back()->getPort());
     }
-    else {      
-        add0(init0);            // First broker, run in this process.
+    else {                      // Run in this process
+        addLocal();
     }
 }
 
-void ClusterFixture::add0(bool init) {
-    if (!init) {
-        push_back(0);
-        return;
-    }
+void ClusterFixture::addLocal() {
+    assert(int(size()) == localIndex || localIndex == -1);
+    localIndex = size();
     const char* argv[] = {
         "qpidd " __FILE__ ,
         "--load-module=../.libs/cluster.so",
@@ -157,10 +151,11 @@
         "--auth=no", "--no-data-dir"
     };
     size_t argc = sizeof(argv)/sizeof(argv[0]);
-
-    qpid::log::Logger::instance().setPrefix("main");
-    broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
-    if (size()) front() = broker0->getPort(); else push_back(broker0->getPort());
+    ostringstream os; os << "local" << localIndex;
+    qpid::log::Logger::instance().setPrefix(os.str());
+    localBroker.reset(new BrokerFixture(parseOpts(argc, argv)));
+    push_back(localBroker->getPort());
+    forkedBrokers.push_back(shared_ptr<ForkedBroker>());
 }
 
 ostream& operator<<(ostream& o, const cpg_name* n) {
@@ -188,14 +183,15 @@
 
 template <class T>  std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
     vector<Url> urls = source.getKnownBrokers();
-    BOOST_MESSAGE("knownBrokerPorts " << n << ": " << urls);
-    if (n >= 0) {
-        for (size_t retry=10; urls.size() != unsigned(n) && retry != 0; --retry) {
-            ::usleep(100000);
+    if (n >= 0 && unsigned(n) != urls.size()) {
+        BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
+        // Retry up to 10 secs in .1 second intervals.
+        for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
+            ::usleep(1000*100); // 0.1 secs
             urls = source.getKnownBrokers();
-            BOOST_MESSAGE("knownBrokerPorts retry: " << urls);
         }
     }
+    BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
     set<uint16_t> s;
     for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) 
         s.insert((*i)[0].get<TcpAddress>()->port);
@@ -266,7 +262,6 @@
 
     // Add new member while there are unacked messages.
     cluster.add();
-    cluster.waitFor(2);
     Client c1(cluster[1], "c1"); 
 
     // Check queue counts
@@ -336,7 +331,7 @@
 
     // No reliable way to ensure the partial message has arrived
     // before we start the new broker, so we sleep.
-    ::usleep(250); 
+    ::usleep(2500); 
     cluster.add();
 
     // Send final 2 frames of message.
@@ -378,7 +373,6 @@
     BOOST_CHECK_EQUAL(kb2,kb1);
 
     cluster.killWithSilencer(1,c1.connection,9);
-    cluster.waitFor(2);
     kb0 = knownBrokerPorts(c0.connection, 2);
     kb2 = knownBrokerPorts(c2.connection, 2);
     BOOST_CHECK_EQUAL(kb0.size(), 2u);
@@ -386,58 +380,54 @@
 }
 
 QPID_AUTO_TEST_CASE(DumpConsumers) {
-    ClusterFixture cluster(1, false); // Don't init broker 0
+    ClusterFixture cluster(1, 1);  
 
-    cluster.add();
-    Client c1(cluster[1], "c1"); 
-    c1.session.queueDeclare("p");
-    c1.session.queueDeclare("q");
-    c1.subs.subscribe(c1.lq, "q", FlowControl::zero());
+    Client c0(cluster[0], "c0"); 
+    c0.session.queueDeclare("p");
+    c0.session.queueDeclare("q");
+    c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
     LocalQueue lp;
-    c1.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
-    c1.session.sync();
+    c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
+    c0.session.sync();
 
     // Start new members
-    cluster.add0(true);
-    Client c0(cluster[0], "c0"); 
+    cluster.add();              // Local
+    Client c1(cluster[1], "c1"); 
     cluster.add();
     Client c2(cluster[2], "c2"); 
 
     // Transfer messages
-    c1.session.messageTransfer(arg::content=Message("aaa", "q"));
-    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
-    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
-    BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
+    c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+
+    c0.session.messageTransfer(arg::content=Message("bbb", "p"));
+    c0.session.messageTransfer(arg::content=Message("ccc", "p"));
 
-    c1.session.messageTransfer(arg::content=Message("bbb", "p"));
-    c1.session.messageTransfer(arg::content=Message("ccc", "p"));
-    
     // Activate the subscription, ensure message removed on all queues. 
-    c1.subs.setFlowControl("q", FlowControl::unlimited());
+    c0.subs.setFlowControl("q", FlowControl::unlimited());
     Message m;
-    BOOST_CHECK(c1.lq.get(m, TIME_SEC));
+    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "aaa");
-    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
 
-    // Check second subscription's flow control: getsnn first message, not second.
+    // Check second subscription's flow control: gets first message, not second.
     BOOST_CHECK(lp.get(m, TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "bbb");
-    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
 
-    BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "ccc");
     
     // Kill the subscribing member, ensure further messages are not removed.
-    cluster.killWithSilencer(1,c1.connection,9);
-    cluster.waitFor(2);
+    cluster.killWithSilencer(0,c0.connection,9);
+    BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2);
     for (int i = 0; i < 10; ++i) {
-        c0.session.messageTransfer(arg::content=Message("bbb", "q"));
-        BOOST_REQUIRE(c0.subs.get(m, "q", TIME_SEC));
-        BOOST_REQUIRE_EQUAL(m.getData(), "bbb");
+        c1.session.messageTransfer(arg::content=Message("xxx", "q"));
+        BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC));
+        BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
     }
 }
 
@@ -450,7 +440,7 @@
     c0.session.messageTransfer(arg::content=Message("foo","q"));
     c0.session.messageTransfer(arg::content=Message("bar","q"));
     while (c0.session.queueQuery("q").getMessageCount() != 2)
-        ::usleep(1000);         // Wait for message to show up on broker 0.
+        ::usleep(1000);    // Wait for message to show up on broker 0.
 
     // Add a new broker, it should catch up.
     cluster.add();
@@ -460,7 +450,7 @@
     c0.session.messageTransfer(arg::content=Message("pfoo","p"));
 
     // Do some work post-join
-    cluster.waitFor(2);
+    BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2);
     c0.session.messageTransfer(arg::content=Message("pbar","p"));
 
     // Verify new brokers have state.
@@ -545,10 +535,13 @@
 
 QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
     ClusterFixture cluster(3);
-    // First start a subscription.
     Client c0(cluster[0]);
+    BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
+
+    // First start a subscription.
     c0.session.queueDeclare("q");
     c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
+
     // Now send messages
     Client c1(cluster[1]);
     c1.session.messageTransfer(arg::content=Message("foo", "q"));