You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/07/01 17:21:57 UTC
svn commit: r790215 - in /qpid/trunk/qpid/cpp/src/tests: ClusterFailover.cpp
ForkedBroker.cpp Makefile.am PartialFailure.cpp cluster_test.cpp
Author: kpvdr
Date: Wed Jul 1 15:21:54 2009
New Revision: 790215
URL: http://svn.apache.org/viewvc?rev=790215&view=rev
Log:
Fix for cluster_test problems: a) When not started from within test dir, qpidd is not found, and a process cascade results in which each forked process continues and runs all tests after its own instead of terminating; b) Hard-coded paths and names of libs, which have been moved into Makefile.am; c) Some tests use ScopedSuppressLogging, these tests are modified so the scope of the logging suppression does not include the broker/cluster startup.
Modified:
qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp
qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFailover.cpp Wed Jul 1 15:21:54 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -51,7 +51,7 @@
// Test re-connecting with same session name after a failure.
QPID_AUTO_TEST_CASE(testReconnectSameSessionName) {
ostringstream clusterLib;
- clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+ clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so");
ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str());
ClusterFixture cluster(2, args, -1);
Client c0(cluster[0], "foo");
Modified: qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp Wed Jul 1 15:21:54 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,24 +50,24 @@
catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what()));
}
- if (!dataDir.empty())
+ if (!dataDir.empty())
::system(("rm -rf "+dataDir).c_str());
}
void ForkedBroker::kill(int sig) {
if (pid == 0) return;
- int savePid = pid;
+ int savePid = pid;
pid = 0; // Reset pid here in case of an exception.
using qpid::ErrnoException;
- if (::kill(savePid, sig) < 0)
+ if (::kill(savePid, sig) < 0)
throw ErrnoException("kill failed");
int status;
- if (::waitpid(savePid, &status, 0) < 0 && sig != 9)
+ if (::waitpid(savePid, &status, 0) < 0 && sig != 9)
throw ErrnoException("wait for forked process failed");
- if (WEXITSTATUS(status) != 0 && sig != 9)
+ if (WEXITSTATUS(status) != 0 && sig != 9)
throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status)));
}
-
+
namespace std {
static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) {
copy(a.begin(), a.end(), ostream_iterator<string>(o, " "));
@@ -83,7 +83,7 @@
}
}
-
+
void ForkedBroker::init(const Args& userArgs) {
using qpid::ErrnoException;
port = 0;
@@ -105,19 +105,20 @@
::close(pipeFds[0]);
int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent.
if (fd < 0) throw ErrnoException("dup2 failed");
- const char* prog = ::getenv("QPID_FORKED_BROKER");
- if (!prog) prog = "../qpidd";
+ const char* prog = ::getenv("QPIDD_EXEC");
+ if (!prog) prog = "../qpidd"; // This only works from within svn checkout
Args args(userArgs);
args.push_back("--port=0");
// Keep quiet except for errors.
if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")
&& find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end())
- args.push_back("--log-enable=error+");
+ args.push_back("--log-enable=error+");
std::vector<const char*> argv(args.size());
std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
argv.push_back(0);
QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args);
execv(prog, const_cast<char* const*>(&argv[0]));
- throw ErrnoException("execv failed");
+ QPID_LOG(critical, "execv failed to start broker: prog=\"" << prog << "\"; args=\"" << args << "\"; errno=" << errno << " (" << std::strerror(errno) << ")");
+ ::exit(1);
}
}
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Jul 1 15:21:54 2009
@@ -244,7 +244,9 @@
srcdir=$(srcdir) \
top_builddir=$(top_builddir) \
QPID_DATA_DIR= \
- QPID_LIB_DIR=../.libs \
+ ACL_LIB=../.libs/acl.so \
+ CLUSTER_LIB=../.libs/cluster.so \
+ TEST_STORE_LIB=.libs/test_store.so \
BOOST_TEST_SHOW_PROGRESS=yes \
$(srcdir)/run_test
Modified: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp Wed Jul 1 15:21:54 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,8 +50,8 @@
void updateArgs(ClusterFixture::Args& args, size_t index) {
ostringstream clusterLib, testStoreLib, storeName;
- clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
- testStoreLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/../tests/.libs/test_store.so";
+ clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so");
+ testStoreLib << getLibPath("TEST_STORE_LIB", ".libs/test_store.so");
storeName << "s" << index;
args.push_back("--auth");
args.push_back("no");
@@ -88,112 +88,120 @@
// the statements expected to fail (in BOOST_CHECK_THROW) but that
// sproadically lets out messages, possibly because they're in
// Connection thread.
- ScopedSuppressLogging allQuiet;
- ClusterFixture cluster(3, updateArgs, -1);
+ ClusterFixture cluster(3, updateArgs, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
Client c2(cluster[2], "c2");
- queueAndSub(c0);
- c0.session.messageTransfer(content=Message("x", "c0"));
- BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
-
- // Session error.
- BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
- c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
-
- // Connection error, kill c1 on all members.
- queueAndSub(c1);
- BOOST_CHECK_THROW(
- c1.session.messageTransfer(
- content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
- ConnectionException);
- c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
-
- BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
- BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
- BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+ {
+ ScopedSuppressLogging allQuiet;
+ queueAndSub(c0);
+ c0.session.messageTransfer(content=Message("x", "c0"));
+ BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
+
+ // Session error.
+ BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
+ c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
+
+ // Connection error, kill c1 on all members.
+ queueAndSub(c1);
+ BOOST_CHECK_THROW(
+ c1.session.messageTransfer(
+ content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
+ ConnectionException);
+ c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
+
+ BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
+ BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
+ BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+ }
}
// Test errors after a new member joins to verify frame-sequence-numbers are ok in update.
QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
- ScopedSuppressLogging allQuiet;
-
ClusterFixture cluster(1, updateArgs, -1);
Client c0(cluster[0]);
- c0.session.queueDeclare("q", durable=true);
- c0.session.messageTransfer(content=pMessage("a", "q"));
+ {
+ ScopedSuppressLogging allQuiet;
- // Kill the new guy
- cluster.add();
- Client c1(cluster[1]);
- c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
- BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
- BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
-
- // Kill the old guy
- cluster.add();
- Client c2(cluster[2]);
- c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
- BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
- BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+ // Kill the new guy
+ cluster.add();
+ Client c1(cluster[1]);
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
+ BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+ // Kill the old guy
+ cluster.add();
+ Client c2(cluster[2]);
+ c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
+ BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+ }
}
-// Test that if one member fails and others do not, the failure leaves the cluster.
+// Test that if one member fails and others do not, the failure leaves the cluster.
QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
- ScopedSuppressLogging allQuiet;
-
ClusterFixture cluster(3, updateArgs, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
Client c2(cluster[2], "c2");
-
- c0.session.queueDeclare("q", durable=true);
- c0.session.messageTransfer(content=pMessage("a", "q"));
- // Cause partial failure on c1
- c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
- BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
-
- c0.session.messageTransfer(content=pMessage("b", "q"));
- BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
- BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
-
- // Cause partial failure on c2
- c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
- BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
-
- c0.session.messageTransfer(content=pMessage("c", "q"));
- BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
- BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+ {
+ ScopedSuppressLogging allQuiet;
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+ // Cause partial failure on c1
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
+ BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("b", "q"));
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+ // Cause partial failure on c2
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
+ BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("c", "q"));
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+ }
}
-// Test multiple partial falures: 2 fail 2 pass
+// Test multiple partial falures: 2 fail 2 pass
QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
- ScopedSuppressLogging allQuiet;
-
ClusterFixture cluster(4, updateArgs, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
Client c2(cluster[2], "c2");
Client c3(cluster[3], "c3");
-
- c0.session.queueDeclare("q", durable=true);
- c0.session.messageTransfer(content=pMessage("a", "q"));
-
- // Cause partial failure on c1, c2
- c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
- BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
- BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
-
- c0.session.messageTransfer(content=pMessage("b", "q"));
- c3.session.messageTransfer(content=pMessage("c", "q"));
- BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
- // FIXME aconway 2009-06-30: This check fails sporadically with 2 != 3.
- // It should pass reliably.
- // BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+ {
+ ScopedSuppressLogging allQuiet;
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Cause partial failure on c1, c2
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
+ BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+ BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("b", "q"));
+ c3.session.messageTransfer(content=pMessage("c", "q"));
+ BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
+ // FIXME aconway 2009-06-30: This check fails sporadically with 2 != 3.
+ // It should pass reliably.
+ // BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+ }
}
/** FIXME aconway 2009-04-10:
@@ -203,25 +211,27 @@
*/
#if 0
QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
- ScopedSuppressLogging allQuiet;
-
ClusterFixture cluster(2, updateArgs, -1);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
- c0.session.queueDeclare("q", durable=true);
- c0.session.messageTransfer(content=pMessage("a", "q"));
+ {
+ ScopedSuppressLogging allQuiet;
- // Cause failure on member 0 and simultaneous crash on member 1.
- BOOST_CHECK_THROW(
- c0.session.messageTransfer(
- content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
- ConnectionException);
- cluster.wait(1);
-
- Client c00(cluster[0], "c00"); // Old connection is dead.
- BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
- BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Cause failure on member 0 and simultaneous crash on member 1.
+ BOOST_CHECK_THROW(
+ c0.session.messageTransfer(
+ content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
+ ConnectionException);
+ cluster.wait(1);
+
+ Client c00(cluster[0], "c00"); // Old connection is dead.
+ BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+ }
}
#endif
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=790215&r1=790214&r2=790215&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jul 1 15:21:54 2009
@@ -74,7 +74,7 @@
void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
ostringstream clusterLib;
- clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
+ clusterLib << getLibPath("CLUSTER_LIB", "../.libs/cluster.so");
args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
if (durableFlag)
args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR";
@@ -190,7 +190,7 @@
static bool match(const AMQBody& , const AMQBody& ) { return false; }
virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; }
};
-
+
QPID_AUTO_TEST_CASE(testBadClientData) {
// Ensure that bad data on a client connection closes the
// connection but does not stop the broker.
@@ -227,7 +227,7 @@
char cwd[1024];
BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
ostringstream aclLib;
- aclLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/acl.so";
+ aclLib << getLibPath("ACL_LIB", "../.libs/acl.so");
ClusterFixture::Args args;
prepareArgs(args, durableFlag);
args += "--acl-file", string(cwd) + "/cluster_test.acl",
@@ -808,7 +808,7 @@
void run()
{
- try {
+ try {
mgr.execute(*this);
}
catch (const std::exception& e) {
@@ -854,7 +854,6 @@
}
QPID_AUTO_TEST_CASE(testPolicyUpdate) {
- ScopedSuppressLogging allQuiet;
//tests that the policys internal state is accurate on newly
//joined nodes
ClusterFixture::Args args;
@@ -862,26 +861,28 @@
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- QueueOptions options;
- options.setSizePolicy(REJECT, 0, 2);
- c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
- cluster.add();
- Client c2(cluster[1], "c2");
- c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
-
- BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
-
- Message received;
- BOOST_CHECK(c1.subs.get(received, "q"));
- BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
- BOOST_CHECK(c1.subs.get(received, "q"));
- BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
- BOOST_CHECK(!c1.subs.get(received, "q"));
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(REJECT, 0, 2);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
+
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
+
+ Message received;
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
+ BOOST_CHECK(!c1.subs.get(received, "q"));
+ }
}
QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
- ScopedSuppressLogging allQuiet;
//tests that exclusive queues are accurately replicated on newly
//joined nodes
ClusterFixture::Args args;
@@ -889,19 +890,22 @@
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
- cluster.add();
- Client c2(cluster[1], "c2");
- QueueQueryResult result = c2.session.queueQuery("q");
- BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
- BOOST_CHECK(result.getExclusive());
- BOOST_CHECK(result.getAutoDelete());
- BOOST_CHECK(!result.getDurable());
- BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
- BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
- c1.connection.close();
- c2.session = c2.connection.newSession();
- BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+ {
+ ScopedSuppressLogging allQuiet;
+ c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ QueueQueryResult result = c2.session.queueQuery("q");
+ BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
+ BOOST_CHECK(result.getExclusive());
+ BOOST_CHECK(result.getAutoDelete());
+ BOOST_CHECK(!result.getDurable());
+ BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
+ c1.connection.close();
+ c2.session = c2.connection.newSession();
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+ }
}
/**
@@ -940,7 +944,6 @@
}
QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
- ScopedSuppressLogging allQuiet;
//tests that ring queues are accurately replicated on newly
//joined nodes
ClusterFixture::Args args;
@@ -948,24 +951,26 @@
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- QueueOptions options;
- options.setSizePolicy(RING, 0, 5);
- c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- send(c1, "q", 5);
- lockMessages(c1, "q", 1);
- //add new node
- cluster.add();
- BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
- //send one more message
- send(c1, "q", 1, 6);
- //release locked message
- c1.close();
- //check state of queue on both nodes
- checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //send one more message
+ send(c1, "q", 1, 6);
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ }
}
QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
- ScopedSuppressLogging allQuiet;
//tests that ring queues are accurately replicated on newly joined
//nodes; just like testRingQueueUpdate, but new node joins after
//the sixth message has been sent.
@@ -974,24 +979,26 @@
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- QueueOptions options;
- options.setSizePolicy(RING, 0, 5);
- c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- send(c1, "q", 5);
- lockMessages(c1, "q", 1);
- //send sixth message
- send(c1, "q", 1, 6);
- //add new node
- cluster.add();
- BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
- //release locked message
- c1.close();
- //check state of queue on both nodes
- checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //send sixth message
+ send(c1, "q", 1, 6);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+ }
}
QPID_AUTO_TEST_CASE(testRelease) {
- ScopedSuppressLogging allQuiet;
//tests that releasing a messages that was unacked when one node
//joined works correctly
ClusterFixture::Args args;
@@ -999,31 +1006,34 @@
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c1(cluster[0], "c1");
- c1.session.queueDeclare("q", arg::durable=durableFlag);
- for (int i = 0; i < 5; i++) {
- c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ {
+ ScopedSuppressLogging allQuiet;
+ c1.session.queueDeclare("q", arg::durable=durableFlag);
+ for (int i = 0; i < 5; i++) {
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ }
+ //receive but don't ack a message
+ LocalQueue lq;
+ SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+ lqSettings.autoAck = 0;
+ Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+ c1.session.messageFlush("q");
+ Message received;
+ BOOST_CHECK(lq.get(received));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+ //add new node
+ cluster.add();
+
+ lqSub.release(lqSub.getUnaccepted());
+
+ //check state of queue on both nodes
+ vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+ Client c3(cluster[0], "c3");
+ BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+ Client c2(cluster[1], "c2");
+ BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
}
- //receive but don't ack a message
- LocalQueue lq;
- SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
- lqSettings.autoAck = 0;
- Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
- c1.session.messageFlush("q");
- Message received;
- BOOST_CHECK(lq.get(received));
- BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
-
- //add new node
- cluster.add();
-
- lqSub.release(lqSub.getUnaccepted());
-
- //check state of queue on both nodes
- vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
- Client c3(cluster[0], "c3");
- BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
- Client c2(cluster[1], "c2");
- BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
}
QPID_AUTO_TEST_SUITE_END()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org