You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/07/01 17:21:57 UTC

svn commit: r790215 - in /qpid/trunk/qpid/cpp/src/tests: ClusterFailover.cpp ForkedBroker.cpp Makefile.am PartialFailure.cpp cluster_test.cpp

Author: kpvdr
Date: Wed Jul  1 15:21:54 2009
New Revision: 790215

URL: http://svn.apache.org/viewvc?rev=790215&view=rev
Log:
Fix for cluster_test problems: a) When not started from within test dir, qpidd is not found, and a process cascade results in which each forked process continues and runs all tests after its own instead of terminating; b) Hard-coded paths and names of libs, which have been moved into Makefile.am; c) Some tests use ScopedSuppressLogging, these tests are modified so the scope of the logging suppression does not include the broker/cluster startup.

Modified:
    qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp
    qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp Wed Jul  1 15:21:54 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -51,7 +51,7 @@
 // Test re-connecting with same session name after a failure.
 QPID_AUTO_TEST_CASE(testReconnectSameSessionName) {
     ostringstream clusterLib;
-    clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+    clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so");
     ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str());
     ClusterFixture cluster(2, args, -1);
     Client c0(cluster[0], "foo");

Modified: qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp Wed Jul  1 15:21:54 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,24 +50,24 @@
     catch (const std::exception& e) {
         QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what()));
     }
-    if (!dataDir.empty()) 
+    if (!dataDir.empty())
         ::system(("rm -rf "+dataDir).c_str());
 }
 
 void ForkedBroker::kill(int sig) {
     if (pid == 0) return;
-    int savePid = pid;      
+    int savePid = pid;
     pid = 0;                // Reset pid here in case of an exception.
     using qpid::ErrnoException;
-    if (::kill(savePid, sig) < 0) 
+    if (::kill(savePid, sig) < 0)
             throw ErrnoException("kill failed");
     int status;
-    if (::waitpid(savePid, &status, 0) < 0 && sig != 9) 
+    if (::waitpid(savePid, &status, 0) < 0 && sig != 9)
         throw ErrnoException("wait for forked process failed");
-    if (WEXITSTATUS(status) != 0 && sig != 9) 
+    if (WEXITSTATUS(status) != 0 && sig != 9)
         throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status)));
 }
-        
+
 namespace std {
 static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) {
     copy(a.begin(), a.end(), ostream_iterator<string>(o, " "));
@@ -83,7 +83,7 @@
 }
 
 }
-        
+
 void ForkedBroker::init(const Args& userArgs) {
     using qpid::ErrnoException;
     port = 0;
@@ -105,19 +105,20 @@
         ::close(pipeFds[0]);
         int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent.
         if (fd < 0) throw ErrnoException("dup2 failed");
-        const char* prog = ::getenv("QPID_FORKED_BROKER");
-        if (!prog) prog = "../qpidd";
+        const char* prog = ::getenv("QPIDD_EXEC");
+        if (!prog) prog = "../qpidd"; // This only works from within svn checkout
         Args args(userArgs);
         args.push_back("--port=0");
         // Keep quiet except for errors.
         if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")
             && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end())
-            args.push_back("--log-enable=error+"); 
+            args.push_back("--log-enable=error+");
         std::vector<const char*> argv(args.size());
         std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
         argv.push_back(0);
         QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args);
         execv(prog, const_cast<char* const*>(&argv[0]));
-        throw ErrnoException("execv failed");
+        QPID_LOG(critical, "execv failed to start broker: prog=\"" << prog << "\"; args=\"" << args << "\"; errno=" << errno << " (" << std::strerror(errno) << ")");
+        ::exit(1);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Jul  1 15:21:54 2009
@@ -244,7 +244,9 @@
     srcdir=$(srcdir) \
     top_builddir=$(top_builddir) \
     QPID_DATA_DIR= \
-    QPID_LIB_DIR=../.libs \
+    ACL_LIB=../.libs/acl.so \
+    CLUSTER_LIB=../.libs/cluster.so \
+    TEST_STORE_LIB=.libs/test_store.so \
     BOOST_TEST_SHOW_PROGRESS=yes \
     $(srcdir)/run_test 
 

Modified: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp Wed Jul  1 15:21:54 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,8 +50,8 @@
 
 void updateArgs(ClusterFixture::Args& args, size_t index) {
     ostringstream clusterLib, testStoreLib, storeName;
-    clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
-    testStoreLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/../tests/.libs/test_store.so";
+    clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so");
+    testStoreLib << getLibPath("TEST_STORE_LIB", ".libs/test_store.so");
     storeName << "s" << index;
     args.push_back("--auth");
     args.push_back("no");
@@ -88,112 +88,120 @@
     // the statements expected to fail (in BOOST_CHECK_THROW) but that
     // sproadically lets out messages, possibly because they're in
     // Connection thread.
-    ScopedSuppressLogging allQuiet; 
 
-    ClusterFixture cluster(3, updateArgs, -1);    
+    ClusterFixture cluster(3, updateArgs, -1);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
     Client c2(cluster[2], "c2");
 
-    queueAndSub(c0);
-    c0.session.messageTransfer(content=Message("x", "c0"));
-    BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
-
-    // Session error.
-    BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
-    c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
-
-    // Connection error, kill c1 on all members.
-    queueAndSub(c1);
-    BOOST_CHECK_THROW(
-        c1.session.messageTransfer(
-            content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
-        ConnectionException);
-    c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
-
-    BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
-    BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
-    BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+    {
+        ScopedSuppressLogging allQuiet;
+        queueAndSub(c0);
+        c0.session.messageTransfer(content=Message("x", "c0"));
+        BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
+
+        // Session error.
+        BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
+        c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
+
+        // Connection error, kill c1 on all members.
+        queueAndSub(c1);
+        BOOST_CHECK_THROW(
+            c1.session.messageTransfer(
+                content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
+                ConnectionException);
+        c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
+
+        BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
+        BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
+        BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+    }
 }
 
 
 // Test errors after a new member joins to verify frame-sequence-numbers are ok in update.
 QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
-    ScopedSuppressLogging allQuiet;
-
     ClusterFixture cluster(1, updateArgs, -1);
     Client c0(cluster[0]);
-    c0.session.queueDeclare("q", durable=true);
-    c0.session.messageTransfer(content=pMessage("a", "q"));
+    {
+        ScopedSuppressLogging allQuiet;
 
-    // Kill the new guy
-    cluster.add();
-    Client c1(cluster[1]);
-    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
-    BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
-    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
-
-    // Kill the old guy
-    cluster.add();
-    Client c2(cluster[2]);
-    c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
-    BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+        c0.session.queueDeclare("q", durable=true);
+        c0.session.messageTransfer(content=pMessage("a", "q"));
 
-    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+        // Kill the new guy
+        cluster.add();
+        Client c1(cluster[1]);
+        c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
+        BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+        BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+        // Kill the old guy
+        cluster.add();
+        Client c2(cluster[2]);
+        c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
+        BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+
+        BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+    }
 }
 
-// Test that if one member fails and  others do not, the failure leaves the cluster. 
+// Test that if one member fails and  others do not, the failure leaves the cluster.
 QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
-    ScopedSuppressLogging allQuiet;
-
     ClusterFixture cluster(3, updateArgs, -1);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
     Client c2(cluster[2], "c2");
-    
-    c0.session.queueDeclare("q", durable=true);
-    c0.session.messageTransfer(content=pMessage("a", "q"));
-    // Cause partial failure on c1
-    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
-    BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
-
-    c0.session.messageTransfer(content=pMessage("b", "q"));
-    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
-    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
-
-    // Cause partial failure on c2
-    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
-    BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
-
-    c0.session.messageTransfer(content=pMessage("c", "q"));
-    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
-    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+    {
+        ScopedSuppressLogging allQuiet;
+
+        c0.session.queueDeclare("q", durable=true);
+        c0.session.messageTransfer(content=pMessage("a", "q"));
+        // Cause partial failure on c1
+        c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
+        BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+
+        c0.session.messageTransfer(content=pMessage("b", "q"));
+        BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+        // Cause partial failure on c2
+        c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
+        BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+        c0.session.messageTransfer(content=pMessage("c", "q"));
+        BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
+        BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+    }
 }
 
-// Test multiple partial falures: 2 fail 2 pass 
+// Test multiple partial falures: 2 fail 2 pass
 QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
-    ScopedSuppressLogging allQuiet;
-
     ClusterFixture cluster(4, updateArgs, -1);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
     Client c2(cluster[2], "c2");
     Client c3(cluster[3], "c3");
-    
-    c0.session.queueDeclare("q", durable=true);
-    c0.session.messageTransfer(content=pMessage("a", "q"));
-
-    // Cause partial failure on c1, c2
-    c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
-    BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
-    BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
-
-    c0.session.messageTransfer(content=pMessage("b", "q"));
-    c3.session.messageTransfer(content=pMessage("c", "q"));
-    BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
-    // FIXME aconway 2009-06-30: This check fails sporadically with 2 != 3.
-    // It should pass reliably.
-    // BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+    {
+        ScopedSuppressLogging allQuiet;
+
+        c0.session.queueDeclare("q", durable=true);
+        c0.session.messageTransfer(content=pMessage("a", "q"));
+
+        // Cause partial failure on c1, c2
+        c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
+        BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+        BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+        c0.session.messageTransfer(content=pMessage("b", "q"));
+        c3.session.messageTransfer(content=pMessage("c", "q"));
+        BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
+        // FIXME aconway 2009-06-30: This check fails sporadically with 2 != 3.
+        // It should pass reliably.
+        // BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+    }
 }
 
 /** FIXME aconway 2009-04-10:
@@ -203,25 +211,27 @@
  */
 #if 0
 QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
-    ScopedSuppressLogging allQuiet;
-
     ClusterFixture cluster(2, updateArgs, -1);
     Client c0(cluster[0], "c0");
     Client c1(cluster[1], "c1");
 
-    c0.session.queueDeclare("q", durable=true);
-    c0.session.messageTransfer(content=pMessage("a", "q"));
+    {
+        ScopedSuppressLogging allQuiet;
 
-    // Cause failure on member 0 and simultaneous crash on member 1.
-    BOOST_CHECK_THROW(
-        c0.session.messageTransfer(
-            content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
-        ConnectionException);
-    cluster.wait(1);
-
-    Client c00(cluster[0], "c00"); // Old connection is dead.
-    BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
-    BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+        c0.session.queueDeclare("q", durable=true);
+        c0.session.messageTransfer(content=pMessage("a", "q"));
+
+        // Cause failure on member 0 and simultaneous crash on member 1.
+        BOOST_CHECK_THROW(
+            c0.session.messageTransfer(
+                content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
+                ConnectionException);
+        cluster.wait(1);
+
+        Client c00(cluster[0], "c00"); // Old connection is dead.
+        BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
+        BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+    }
 }
 #endif
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jul  1 15:21:54 2009
@@ -74,7 +74,7 @@
 
 void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
     ostringstream clusterLib;
-    clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+    clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so");
     args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
     if (durableFlag)
         args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR";
@@ -190,7 +190,7 @@
     static bool match(const AMQBody& , const AMQBody& ) { return false; }
     virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; }
 };
-    
+
 QPID_AUTO_TEST_CASE(testBadClientData) {
     // Ensure that bad data on a client connection closes the
     // connection but does not stop the broker.
@@ -227,7 +227,7 @@
     char cwd[1024];
     BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
     ostringstream aclLib;
-    aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so";
+    aclLib << getLibPath("ACL_LIB", "../.libs/acl.so");
     ClusterFixture::Args args;
     prepareArgs(args, durableFlag);
     args += "--acl-file", string(cwd) + "/cluster_test.acl",
@@ -808,7 +808,7 @@
 
         void run()
         {
-            try { 
+            try {
             mgr.execute(*this);
         }
             catch (const std::exception& e) {
@@ -854,7 +854,6 @@
 }
 
 QPID_AUTO_TEST_CASE(testPolicyUpdate) {
-    ScopedSuppressLogging allQuiet;
     //tests that the policys internal state is accurate on newly
     //joined nodes
     ClusterFixture::Args args;
@@ -862,26 +861,28 @@
     prepareArgs(args, durableFlag);
     ClusterFixture cluster(1, args, -1);
     Client c1(cluster[0], "c1");
-    QueueOptions options;
-    options.setSizePolicy(REJECT, 0, 2);
-    c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
-    c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
-    cluster.add();
-    Client c2(cluster[1], "c2");
-    c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
-
-    BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
-
-    Message received;
-    BOOST_CHECK(c1.subs.get(received, "q"));
-    BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
-    BOOST_CHECK(c1.subs.get(received, "q"));
-    BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
-    BOOST_CHECK(!c1.subs.get(received, "q"));
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setSizePolicy(REJECT, 0, 2);
+        c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+        c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
+        cluster.add();
+        Client c2(cluster[1], "c2");
+        c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
+
+        BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
+
+        Message received;
+        BOOST_CHECK(c1.subs.get(received, "q"));
+        BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
+        BOOST_CHECK(c1.subs.get(received, "q"));
+        BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
+        BOOST_CHECK(!c1.subs.get(received, "q"));
+    }
 }
 
 QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
-    ScopedSuppressLogging allQuiet;
     //tests that exclusive queues are accurately replicated on newly
     //joined nodes
     ClusterFixture::Args args;
@@ -889,19 +890,22 @@
     prepareArgs(args, durableFlag);
     ClusterFixture cluster(1, args, -1);
     Client c1(cluster[0], "c1");
-    c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
-    cluster.add();
-    Client c2(cluster[1], "c2");
-    QueueQueryResult result = c2.session.queueQuery("q");
-    BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
-    BOOST_CHECK(result.getExclusive());
-    BOOST_CHECK(result.getAutoDelete());
-    BOOST_CHECK(!result.getDurable());
-    BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
-    BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
-    c1.connection.close();
-    c2.session = c2.connection.newSession();
-    BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+    {
+        ScopedSuppressLogging allQuiet;
+        c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
+        cluster.add();
+        Client c2(cluster[1], "c2");
+        QueueQueryResult result = c2.session.queueQuery("q");
+        BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
+        BOOST_CHECK(result.getExclusive());
+        BOOST_CHECK(result.getAutoDelete());
+        BOOST_CHECK(!result.getDurable());
+        BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
+        BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
+        c1.connection.close();
+        c2.session = c2.connection.newSession();
+        BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+    }
 }
 
 /**
@@ -940,7 +944,6 @@
 }
 
 QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
-    ScopedSuppressLogging allQuiet;
     //tests that ring queues are accurately replicated on newly
     //joined nodes
     ClusterFixture::Args args;
@@ -948,24 +951,26 @@
     prepareArgs(args, durableFlag);
     ClusterFixture cluster(1, args, -1);
     Client c1(cluster[0], "c1");
-    QueueOptions options;
-    options.setSizePolicy(RING, 0, 5);
-    c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
-    send(c1, "q", 5);
-    lockMessages(c1, "q", 1);
-    //add new node
-    cluster.add();
-    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
-    //send one more message
-    send(c1, "q", 1, 6);
-    //release locked message
-    c1.close();
-    //check state of queue on both nodes
-    checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setSizePolicy(RING, 0, 5);
+        c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+        send(c1, "q", 5);
+        lockMessages(c1, "q", 1);
+        //add new node
+        cluster.add();
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+        //send one more message
+        send(c1, "q", 1, 6);
+        //release locked message
+        c1.close();
+        //check state of queue on both nodes
+        checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+    }
 }
 
 QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
-    ScopedSuppressLogging allQuiet;
     //tests that ring queues are accurately replicated on newly joined
     //nodes; just like testRingQueueUpdate, but new node joins after
     //the sixth message has been sent.
@@ -974,24 +979,26 @@
     prepareArgs(args, durableFlag);
     ClusterFixture cluster(1, args, -1);
     Client c1(cluster[0], "c1");
-    QueueOptions options;
-    options.setSizePolicy(RING, 0, 5);
-    c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
-    send(c1, "q", 5);
-    lockMessages(c1, "q", 1);
-    //send sixth message
-    send(c1, "q", 1, 6);
-    //add new node
-    cluster.add();
-    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
-    //release locked message
-    c1.close();
-    //check state of queue on both nodes
-    checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setSizePolicy(RING, 0, 5);
+        c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+        send(c1, "q", 5);
+        lockMessages(c1, "q", 1);
+        //send sixth message
+        send(c1, "q", 1, 6);
+        //add new node
+        cluster.add();
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+        //release locked message
+        c1.close();
+        //check state of queue on both nodes
+        checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+    }
 }
 
 QPID_AUTO_TEST_CASE(testRelease) {
-    ScopedSuppressLogging allQuiet;
     //tests that releasing a messages that was unacked when one node
     //joined works correctly
     ClusterFixture::Args args;
@@ -999,31 +1006,34 @@
     prepareArgs(args, durableFlag);
     ClusterFixture cluster(1, args, -1);
     Client c1(cluster[0], "c1");
-    c1.session.queueDeclare("q", arg::durable=durableFlag);
-    for (int i = 0; i < 5; i++) {
-        c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+    {
+        ScopedSuppressLogging allQuiet;
+        c1.session.queueDeclare("q", arg::durable=durableFlag);
+        for (int i = 0; i < 5; i++) {
+            c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+        }
+        //receive but don't ack a message
+        LocalQueue lq;
+        SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+        lqSettings.autoAck = 0;
+        Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+        c1.session.messageFlush("q");
+        Message received;
+        BOOST_CHECK(lq.get(received));
+        BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+        //add new node
+        cluster.add();
+
+        lqSub.release(lqSub.getUnaccepted());
+
+        //check state of queue on both nodes
+        vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+        Client c3(cluster[0], "c3");
+        BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+        Client c2(cluster[1], "c2");
+        BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
     }
-    //receive but don't ack a message
-    LocalQueue lq;
-    SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
-    lqSettings.autoAck = 0;
-    Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
-    c1.session.messageFlush("q");
-    Message received;
-    BOOST_CHECK(lq.get(received));
-    BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
-
-    //add new node
-    cluster.add();
-
-    lqSub.release(lqSub.getUnaccepted());
-
-    //check state of queue on both nodes
-    vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
-    Client c3(cluster[0], "c3");
-    BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
-    Client c2(cluster[1], "c2");
-    BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
 }
 
 QPID_AUTO_TEST_SUITE_END()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org