You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/05/01 21:17:59 UTC

svn commit: r770796 - in /qpid/trunk/qpid/cpp/src/tests: ClusterFailover.cpp ClusterFixture.cpp ClusterFixture.h Makefile.am PartialFailure.cpp cluster_test.cpp test_tools.h

Author: kpvdr
Date: Fri May  1 19:17:59 2009
New Revision: 770796

URL: http://svn.apache.org/viewvc?rev=770796&view=rev
Log:
Cluster test code now has a persistence switch controlled by the environment. When this switch set, all brokers start with the store module loaded, all queues are declared persistent and all messages are also made persistent. The absolute paths to module libs hardcoded into the test fixtures have been replaced by paths relative to environment variable QPID_LIB_DIR (which is set in Makefile.am). The cluster test, when run from qpid, will continue to run without persistence by default; the intention is to have the store test code run this test directly with the switch turned on.

Modified:
    qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp
    qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
    qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/src/tests/test_tools.h

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp?rev=770796&r1=770795&r2=770796&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp Fri May  1 19:17:59 2009
@@ -50,7 +50,10 @@
 
 // Test re-connecting with same session name after a failure.
 QPID_AUTO_TEST_CASE(testReconnectSameSessionName) {
-    ClusterFixture cluster(2, -1);
+    ostringstream clusterLib;
+    clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+    ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str());
+    ClusterFixture cluster(2, args, -1);
     Client c0(cluster[0], "foo");
     cluster.kill(0, 9);
     Client c1(cluster[1], "foo"); // Using same name, should be cleaned up.

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp?rev=770796&r1=770795&r2=770796&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp Fri May  1 19:17:59 2009
@@ -61,25 +61,20 @@
 
 #include "ClusterFixture.h"
 
-ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_, const string& clusterLib_)
-    : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_), clusterLib(clusterLib_)
+ClusterFixture::ClusterFixture(size_t n, const Args& args_, int localIndex_)
+    : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_)
 {
     add(n);
 }
 
-ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_, const string& clusterLib_)
-    : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_), clusterLib(clusterLib_)
+ClusterFixture::ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs_, int localIndex_)
+    : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_)
 {
     add(n);
 }
 
-const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
-    list_of<string>("--auth=no")("--no-data-dir");
-
 ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) {
     Args args = list_of<string>("qpidd ")
-        ("--no-module-dir")
-        ("--load-module")(clusterLib)
         ("--cluster-name")(name)
         ("--log-prefix")(prefix);
     args.insert(args.end(), userArgs.begin(), userArgs.end());

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h?rev=770796&r1=770795&r2=770796&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h Fri May  1 19:17:59 2009
@@ -60,8 +60,6 @@
 using boost::shared_ptr;
 using qpid::cluster::Cluster;
 
-#define DEFAULT_CLUSTER_LIB "../.libs/cluster.so"
-
 /** Cluster fixture is a vector of ports for the replicas.
  *
  * At most one replica (by default replica 0) is in the current
@@ -70,15 +68,14 @@
 class ClusterFixture : public vector<uint16_t>  {
   public:
     typedef std::vector<std::string> Args;
-    static const Args DEFAULT_ARGS;
 
     /** @param localIndex can be -1 meaning don't automatically start a local broker.
      * A local broker can be started with addLocal().
      */
-    ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS, const string& clusterLib = DEFAULT_CLUSTER_LIB);
+    ClusterFixture(size_t n, const Args& args, int localIndex=0);
 
     /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */
-    ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs, const string& clusterLib = DEFAULT_CLUSTER_LIB);
+    ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, int localIndex);
 
     void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
     void add();                 // Add a broker.
@@ -102,7 +99,6 @@
     std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
     Args userArgs;
     boost::function<void (Args&, size_t)> updateArgs;
-    string clusterLib;
 };
 
 /**

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=770796&r1=770795&r2=770796&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri May  1 19:17:59 2009
@@ -211,7 +211,13 @@
 DispatcherTest_SOURCES=DispatcherTest.cpp
 DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS)
 
-TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test 
+TESTS_ENVIRONMENT = \
+    VALGRIND=$(VALGRIND) \
+    srcdir=$(srcdir) \
+    QPID_DATA_DIR= \
+    QPID_LIB_DIR=../.libs \
+    BOOST_TEST_SHOW_PROGRESS=yes \
+    $(srcdir)/run_test 
 
 system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
 TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test

Modified: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp?rev=770796&r1=770795&r2=770796&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp Fri May  1 19:17:59 2009
@@ -33,7 +33,7 @@
 
 QPID_AUTO_TEST_SUITE(PartialFailureTestSuite)
 
-    using namespace std;
+using namespace std;
 using namespace qpid;
 using namespace qpid::cluster;
 using namespace qpid::framing;
@@ -49,11 +49,19 @@
 static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); }
 
 void updateArgs(ClusterFixture::Args& args, size_t index) {
-    ostringstream os;
-    os << "--test-store-name=s" << index;
-    args.push_back(os.str());
-    args.push_back("--load-module=.libs/test_store.so");
-    args.push_back("--auth=no");
+    ostringstream clusterLib, testStoreLib, storeName;
+    clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+    testStoreLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/../tests/.libs/test_store.so";
+    storeName << "s" << index;
+    args.push_back("--auth");
+    args.push_back("no");
+    args.push_back("--no-module-dir");
+    args.push_back("--load-module");
+    args.push_back(clusterLib.str());
+    args.push_back("--load-module");
+    args.push_back(testStoreLib.str());
+    args.push_back("--test-store-name");
+    args.push_back(storeName.str());
     args.push_back("TMP_DATA_DIR");
 
     // These tests generate errors deliberately, disable error logging unless a log env var is set.
@@ -82,7 +90,7 @@
     // Connection thread.
     ScopedSuppressLogging allQuiet; 
 
-    ClusterFixture cluster(3, -1, updateArgs);    
+    ClusterFixture cluster(3, updateArgs, -1);    
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
     Client c2(cluster[2], "c2");
@@ -113,7 +121,7 @@
 QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
     ScopedSuppressLogging allQuiet;
 
-    ClusterFixture cluster(1, -1, updateArgs);
+    ClusterFixture cluster(1, updateArgs, -1);
     Client c0(cluster[0]);
     c0.session.queueDeclare("q", durable=true);
     c0.session.messageTransfer(content=pMessage("a", "q"));
@@ -138,7 +146,7 @@
 QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
     ScopedSuppressLogging allQuiet;
 
-    ClusterFixture cluster(3, -1, updateArgs);
+    ClusterFixture cluster(3, updateArgs, -1);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
     Client c2(cluster[2], "c2");
@@ -166,7 +174,7 @@
 QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
     ScopedSuppressLogging allQuiet;
 
-    ClusterFixture cluster(4, -1, updateArgs);
+    ClusterFixture cluster(4, updateArgs, -1);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
     Client c2(cluster[2], "c2");
@@ -195,7 +203,7 @@
 QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
     ScopedSuppressLogging allQuiet;
 
-    ClusterFixture cluster(2, -1, updateArgs);
+    ClusterFixture cluster(2, updateArgs, -1);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=770796&r1=770795&r2=770796&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri May  1 19:17:59 2009
@@ -69,6 +69,18 @@
 using broker::Broker;
 using boost::shared_ptr;
 
+bool durableFlag = std::getenv("DURABLE_ENABLE") != 0;
+
+void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
+    ostringstream clusterLib;
+    clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+    args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
+    if (durableFlag)
+        args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR";
+    else
+        args += "--no-data-dir";
+}
+
 // Timeout for tests that wait for messages
 const sys::Duration TIMEOUT=sys::TIME_SEC/4;
 
@@ -166,29 +178,31 @@
     policyFile.close();
     char cwd[1024];
     BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
-    ClusterFixture cluster(2,-1, list_of<string>
-                           ("--no-data-dir")
-                           ("--auth=no")
-                           ("--acl-file="+string(cwd)+"/cluster_test.acl")
-                           ("--cluster-mechanism=PLAIN")
-                           ("--cluster-username=cluster")
-                           ("--cluster-password=cluster")
-                           ("--load-module=../.libs/acl.so"));
+    ostringstream aclLib;
+    aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so";
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    args += "--acl-file", string(cwd) + "/cluster_test.acl",
+            "--cluster-mechanism", "PLAIN",
+            "--cluster-username", "cluster",
+            "--cluster-password", "cluster",
+            "--load-module", aclLib.str();
+    ClusterFixture cluster(2, args, -1);
 
     Client c0(aclSettings(cluster[0], "c0"), "c0");
     Client c1(aclSettings(cluster[1], "c1"), "c1");
     Client foo(aclSettings(cluster[1], "foo"), "foo");
 
-    foo.session.queueDeclare("foo");
+    foo.session.queueDeclare("foo", arg::durable=durableFlag);
     BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo");
 
-    BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException);
+    BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException);
     BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty());
     BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty());
 
     cluster.add();
     Client c2(aclSettings(cluster[2], "c2"), "c2");
-    BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException);
+    BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::NotAllowedException);
     BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty());
 }
 
@@ -198,15 +212,17 @@
     // Note: this doesn't actually test for cluster race conditions around TTL,
     // it just verifies that basic TTL functionality works.
     //
-    ClusterFixture cluster(2);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(2, args, -1);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
-    c0.session.queueDeclare("p");
-    c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
-    c0.session.messageTransfer(arg::content=Message("b", "q"));
-    c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000));
-    c0.session.messageTransfer(arg::content=Message("y", "p"));
+    c0.session.queueDeclare("p", arg::durable=durableFlag);
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("y", "p"), arg::durable=durableFlag);
     cluster.add();
     Client c2(cluster[1], "c2");
 
@@ -222,44 +238,48 @@
 
 QPID_AUTO_TEST_CASE(testSequenceOptions) {
     // Make sure the exchange qpid.msg_sequence property is properly replicated.
-    ClusterFixture cluster(1);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0], "c0");
-    FieldTable args;
-    args.setInt("qpid.msg_sequence", 1); 
-    c0.session.queueDeclare(arg::queue="q");
-    c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
+    FieldTable ftargs;
+    ftargs.setInt("qpid.msg_sequence", 1); 
+    c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+    c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs);
     c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
-    c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
-    c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
+    c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=durableFlag);
     BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
     BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
 
     cluster.add();
     Client c1(cluster[1]);
-    c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");    
+    c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex", arg::durable=durableFlag);    
     BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
 }
 
 QPID_AUTO_TEST_CASE(testTxTransaction) {
-    ClusterFixture cluster(1);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0], "c0");
-    c0.session.queueDeclare(arg::queue="q");
-    c0.session.messageTransfer(arg::content=Message("A", "q"));
-    c0.session.messageTransfer(arg::content=Message("B", "q"));
+    c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag);
 
     // Start a transaction that will commit.
     Session commitSession = c0.connection.newSession("commit");
     SubscriptionManager commitSubs(commitSession);
     commitSession.txSelect();
-    commitSession.messageTransfer(arg::content=Message("a", "q"));
-    commitSession.messageTransfer(arg::content=Message("b", "q"));
+    commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=durableFlag);
+    commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=durableFlag);
     BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
 
     // Start a transaction that will roll back.
     Session rollbackSession = c0.connection.newSession("rollback");
     SubscriptionManager rollbackSubs(rollbackSession);
     rollbackSession.txSelect();
-    rollbackSession.messageTransfer(arg::content=Message("1", "q"));
+    rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=durableFlag);
     Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
     BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
 
@@ -270,9 +290,9 @@
 
     // More transactional work
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
-    rollbackSession.messageTransfer(arg::content=Message("2", "q"));
-    commitSession.messageTransfer(arg::content=Message("c", "q"));
-    rollbackSession.messageTransfer(arg::content=Message("3", "q"));
+    rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=durableFlag);
+    commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=durableFlag);
+    rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=durableFlag);
 
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);    
 
@@ -292,15 +312,17 @@
 
 QPID_AUTO_TEST_CASE(testUnacked) {
     // Verify replication of unacknowledged messages.
-    ClusterFixture cluster(1);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0], "c0"); 
 
     Message m;
 
     // Create unacked message: acquired but not accepted.
     SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
-    c0.session.queueDeclare("q1");
-    c0.session.messageTransfer(arg::content=Message("11","q1"));
+    c0.session.queueDeclare("q1", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=durableFlag);
     LocalQueue q1;
     c0.subs.subscribe(q1, "q1", manualAccept);
     BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
@@ -308,9 +330,9 @@
 
     // Create unacked message: not acquired, accepted or completeed.
     SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
-    c0.session.queueDeclare("q2");
-    c0.session.messageTransfer(arg::content=Message("21","q2"));
-    c0.session.messageTransfer(arg::content=Message("22","q2"));
+    c0.session.queueDeclare("q2", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=durableFlag);
     LocalQueue q2;
     c0.subs.subscribe(q2, "q2", manualAcquire);
     m = q2.get(TIMEOUT);  // Not acquired or accepted, still on queue
@@ -323,9 +345,9 @@
 
     // Create empty credit record: acquire and accept but don't complete.
     SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
-    c0.session.queueDeclare("q3");
-    c0.session.messageTransfer(arg::content=Message("31", "q3"));
-    c0.session.messageTransfer(arg::content=Message("32", "q3"));
+    c0.session.queueDeclare("q3", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=durableFlag);
     LocalQueue q3;
     c0.subs.subscribe(q3, "q3", manualComplete);
     Message m31=q3.get(TIMEOUT);
@@ -360,14 +382,16 @@
 
 QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
     // Verify that we update transaction state correctly to new members.
-    ClusterFixture cluster(1);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0], "c0");
 
     // Do work in a transaction.
     c0.session.txSelect();
-    c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=Message("1","q"));
-    c0.session.messageTransfer(arg::content=Message("2","q"));
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=durableFlag);
     Message m;
     BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "1");
@@ -384,7 +408,7 @@
     BOOST_CHECK_EQUAL(m.getData(), "2");
 
     // Another transaction with both members active.
-    c0.session.messageTransfer(arg::content=Message("3","q"));
+    c0.session.messageTransfer(arg::content=Message("3","q"), arg::durable=durableFlag);
     BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
     c0.session.txCommit();
     BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
@@ -394,9 +418,11 @@
 
 QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
     // Verify that we update a partially recieved message to a new member.
-    ClusterFixture cluster(1);    
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);    
     Client c0(cluster[0], "c0");
-    c0.session.queueDeclare("q");
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
     Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
 
     // Send first 2 frames of message.
@@ -407,6 +433,10 @@
     sender.send(transfer, true, false, true, true);
     AMQHeaderBody header;
     header.get<DeliveryProperties>(true)->setRoutingKey("q");
+    if (durableFlag)
+        header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+    else
+        header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT);
     sender.send(header, false, false, true, true);
 
     // No reliable way to ensure the partial message has arrived
@@ -427,7 +457,9 @@
 }
 
 QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
-    ClusterFixture cluster(1);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0], "c0");
     set<int> kb0 = knownBrokerPorts(c0.connection);
     BOOST_CHECK_EQUAL(kb0.size(), 1u);
@@ -459,11 +491,13 @@
 }
 
 QPID_AUTO_TEST_CASE(testUpdateConsumers) {
-    ClusterFixture cluster(1, 1);  
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);  
 
     Client c0(cluster[0], "c0"); 
-    c0.session.queueDeclare("p");
-    c0.session.queueDeclare("q");
+    c0.session.queueDeclare("p", arg::durable=durableFlag);
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
     c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
     LocalQueue lp;
     c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
@@ -476,10 +510,10 @@
     Client c2(cluster[2], "c2"); 
 
     // Transfer messages
-    c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+    c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=durableFlag);
 
-    c0.session.messageTransfer(arg::content=Message("bbb", "p"));
-    c0.session.messageTransfer(arg::content=Message("ccc", "p"));
+    c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=durableFlag);
 
     // Activate the subscription, ensure message removed on all queues. 
     c0.subs.setFlowControl("q", FlowControl::unlimited());
@@ -504,20 +538,22 @@
     cluster.killWithSilencer(0,c0.connection,9);
     BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
     for (int i = 0; i < 10; ++i) {
-        c1.session.messageTransfer(arg::content=Message("xxx", "q"));
+        c1.session.messageTransfer(arg::content=Message("xxx", "q"), arg::durable=durableFlag);
         BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
         BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
     }
 }
 
 QPID_AUTO_TEST_CASE(testCatchupSharedState) {
-    ClusterFixture cluster(1);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0], "c0");
 
     // Create some shared state.
-    c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=Message("foo","q"));
-    c0.session.messageTransfer(arg::content=Message("bar","q"));
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=durableFlag);
 
     while (c0.session.queueQuery("q").getMessageCount() != 2)
         sys::usleep(1000);    // Wait for message to show up on broker 0.
@@ -526,12 +562,12 @@
     cluster.add();
 
     // Do some work post-add
-    c0.session.queueDeclare("p");
-    c0.session.messageTransfer(arg::content=Message("pfoo","p"));
+    c0.session.queueDeclare("p", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=durableFlag);
 
     // Do some work post-join
     BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
-    c0.session.messageTransfer(arg::content=Message("pbar","p"));
+    c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=durableFlag);
 
     // Verify new brokers have state.
     Message m;
@@ -556,11 +592,13 @@
 }
 
 QPID_AUTO_TEST_CASE(testWiringReplication) {
-    ClusterFixture cluster(3);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(3, args, -1);
     Client c0(cluster[0]);
     BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
     BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); 
-    c0.session.queueDeclare("q");
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
     c0.session.exchangeDeclare("ex", arg::type="direct");
     c0.session.close();
     c0.connection.close();
@@ -575,11 +613,13 @@
 
 QPID_AUTO_TEST_CASE(testMessageEnqueue) {
     // Enqueue on one broker, dequeue on another.
-    ClusterFixture cluster(2);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(2, args, -1);
     Client c0(cluster[0]);
-    c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=Message("foo", "q"));
-    c0.session.messageTransfer(arg::content=Message("bar", "q"));
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
     c0.session.close();
     Client c1(cluster[1]);
     Message msg;
@@ -591,11 +631,13 @@
 
 QPID_AUTO_TEST_CASE(testMessageDequeue) {
     // Enqueue on one broker, dequeue on two others.
-    ClusterFixture cluster(3);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(3, args, -1);
     Client c0(cluster[0], "c0");
-    c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=Message("foo", "q"));
-    c0.session.messageTransfer(arg::content=Message("bar", "q"));
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
 
     Message msg;
 
@@ -615,18 +657,20 @@
 }
 
 QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
-    ClusterFixture cluster(3);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(3, args, -1);
     Client c0(cluster[0]);
     BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
 
     // First start a subscription.
-    c0.session.queueDeclare("q");
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
     c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
 
     // Now send messages
     Client c1(cluster[1]);
-    c1.session.messageTransfer(arg::content=Message("foo", "q"));
-    c1.session.messageTransfer(arg::content=Message("bar", "q"));
+    c1.session.messageTransfer(arg::content=Message("foo", "q"), arg::durable=durableFlag);
+    c1.session.messageTransfer(arg::content=Message("bar", "q"), arg::durable=durableFlag);
 
     // Check they arrived
     Message m;
@@ -653,7 +697,7 @@
 
         void execute(AsyncSession& session, bool)
         {
-            session.messageTransfer(arg::content=Message(content, queue));
+            session.messageTransfer(arg::content=Message(content, queue), arg::durable=durableFlag);
         }
     };
 
@@ -676,7 +720,7 @@
 
         void execute(AsyncSession& session, bool)
         {
-            session.queueDeclare(arg::queue=queue);
+            session.queueDeclare(arg::queue=queue, arg::durable=durableFlag);
             SubscriptionManager subs(session);
             subscription = subs.subscribe(*this, queue);
             session.sync();
@@ -707,7 +751,9 @@
         }
     };
 
-    ClusterFixture cluster(2);
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(2, args, -1);
     ConnectionSettings settings;
     settings.port = cluster[1];
     settings.heartbeat = 1;

Modified: qpid/trunk/qpid/cpp/src/tests/test_tools.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_tools.h?rev=770796&r1=770795&r2=770796&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_tools.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_tools.h Fri May  1 19:17:59 2009
@@ -89,6 +89,15 @@
     qpid::log::Options opts;
 };
 
+inline std::string getLibPath(const char* envName, const char* defaultPath = 0) {
+    const char* p = std::getenv(envName);
+    if (p != 0)
+        return p;
+    if (defaultPath == 0)
+        BOOST_FAIL("Environment variable " << envName << " not set.");
+    return defaultPath;
+}
+
 
 #endif  /*!TEST_TOOLS_H*/
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Re: [c++]: arg::durable=durableFlag has no effect in messageTransfer()

Posted by Kim van der Riet <ki...@redhat.com>.
On Wed, 2009-05-27 at 14:27 +0100, Gordon Sim wrote:
> kpvdr@apache.org wrote:
> > Author: kpvdr
> > Date: Fri May  1 19:17:59 2009
> > New Revision: 770796
> > 
> > URL: http://svn.apache.org/viewvc?rev=770796&view=rev
> 
> Kim,
> 
> Revisiting the cluster tests recently I noticed all the 
> messageTransfer() requests had an additional argument (e.g. as below) 
> and tracked this back to your earlier commit.
> 
> However messageTransfer() does not take a durable argument so this 
> addition will have no effect. To make the messages persistent you need 
> to set the delivery mode on the delivery properties.
> 
> --Gordon.
> 
> > -    c0.session.queueDeclare(arg::queue="q");
> > -    c0.session.messageTransfer(arg::content=Message("A", "q"));
> > -    c0.session.messageTransfer(arg::content=Message("B", "q"));
> > +    c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
> > +    c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag);
> > +    c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag);
> 
Thanks for pointing this out. I'll fix it right away. I checked the
store files, and they are empty.

Kim



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


[c++]: arg::durable=durableFlag has no effect in messageTransfer()

Posted by Gordon Sim <gs...@redhat.com>.
kpvdr@apache.org wrote:
> Author: kpvdr
> Date: Fri May  1 19:17:59 2009
> New Revision: 770796
> 
> URL: http://svn.apache.org/viewvc?rev=770796&view=rev

Kim,

Revisiting the cluster tests recently I noticed all the 
messageTransfer() requests had an additional argument (e.g. as below) 
and tracked this back to your earlier commit.

However messageTransfer() does not take a durable argument so this 
addition will have no effect. To make the messages persistent you need 
to set the delivery mode on the delivery properties.

--Gordon.

> -    c0.session.queueDeclare(arg::queue="q");
> -    c0.session.messageTransfer(arg::content=Message("A", "q"));
> -    c0.session.messageTransfer(arg::content=Message("B", "q"));
> +    c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
> +    c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=durableFlag);
> +    c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=durableFlag);

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org