You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2013/01/25 19:20:49 UTC
svn commit: r1438629 [6/10] - in
/qpid/branches/java-broker-config-qpid-4390: ./ qpid/ qpid/bin/ qpid/cpp/
qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf2/ qpid/cpp/bindings/qpid/
qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/t/ qpid/cpp/bindi...
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/BrokerMgmtAgent.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/BrokerMgmtAgent.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/BrokerMgmtAgent.cpp Fri Jan 25 18:20:39 2013
@@ -22,6 +22,7 @@
#include "unit_test.h"
#include "MessagingFixture.h"
#include "qpid/management/Buffer.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/messaging/Message.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/log/Logger.h"
@@ -323,361 +324,6 @@ QPID_AUTO_TEST_CASE(v2ObjPublish)
delete tm;
}
-
-// verify that a deleted object is exported correctly using the
-// exportDeletedObjects() method. V1 testcase.
-//
-QPID_AUTO_TEST_CASE(v1ExportDelObj)
-{
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- // create a manageable test object
- TestManageable *tm = new TestManageable(agent, std::string("myObj"));
- uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
-
- Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
-
- agent->addObject(tm->GetManagementObject(), 1);
-
- // wait for the object to be published
- Message m1;
- BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
-
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
-
- // destroy the object, then immediately export (before the next poll cycle)
-
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- tm->GetManagementObject()->resourceDestroy();
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 1);
-
- // wait for the deleted object to be published
-
- bool isDeleted = false;
- while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
-
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
-
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
-
- TestManageable::validateTestObjectProperties(**oIter);
-
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- isDeleted = true;
- }
- }
-
- BOOST_CHECK(isDeleted);
-
- // verify there are no deleted objects to export now.
-
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 0);
-
- r1.close();
- delete fix;
- delete tm;
-}
-
-
-// verify that a deleted object is imported correctly using the
-// importDeletedObjects() method. V1 testcase.
-//
-QPID_AUTO_TEST_CASE(v1ImportDelObj)
-{
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- // create a manageable test object
- TestManageable *tm = new TestManageable(agent, std::string("anObj"));
- uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
-
- Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
-
- agent->addObject(tm->GetManagementObject(), 1);
-
- // wait for the object to be published
- Message m1;
- BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
-
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
-
- // destroy the object, then immediately export (before the next poll cycle)
-
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- tm->GetManagementObject()->resourceDestroy();
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 1);
-
- // destroy the broker, and reinistantiate a new one without populating it
- // with a TestObject.
-
- r1.close();
- delete fix;
- delete tm; // should no longer be necessary
-
- fix = new AgentFixture(3);
- r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent = fix->getBrokerAgent();
- agent->importDeletedObjects( delObjs );
-
- // wait for the deleted object to be published
-
- bool isDeleted = false;
- while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
-
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
-
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
-
- TestManageable::validateTestObjectProperties(**oIter);
-
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- isDeleted = true;
- }
- }
-
- BOOST_CHECK(isDeleted);
-
- // verify there are no deleted objects to export now.
-
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 0);
-
- r1.close();
- delete fix;
-}
-
-
-// verify that an object that is added and deleted prior to the
-// first poll cycle is accounted for by the export
-//
-QPID_AUTO_TEST_CASE(v1ExportFastDelObj)
-{
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- // create a manageable test object
- TestManageable *tm = new TestManageable(agent, std::string("objectifyMe"));
-
- // add, then immediately delete and export the object...
-
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- agent->addObject(tm->GetManagementObject(), 999);
- tm->GetManagementObject()->resourceDestroy();
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 1);
-
- delete fix;
- delete tm;
-}
-
-
-// Verify that we can export and import multiple deleted objects correctly.
-//
-QPID_AUTO_TEST_CASE(v1ImportMultiDelObj)
-{
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
-
- // populate the agent with multiple test objects
- const size_t objCount = 50;
- std::vector<TestManageable *> tmv;
- uint32_t objLen;
-
- for (size_t i = 0; i < objCount; i++) {
- std::stringstream key;
- key << "testobj-" << std::setfill('x') << std::setw(4) << i;
- // (no, seriously, I didn't just do that.)
- // Note well: we have to keep the key string length EXACTLY THE SAME
- // FOR ALL OBJECTS, so objLen will be the same. Otherwise the
- // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length).
- TestManageable *tm = new TestManageable(agent, key.str());
- objLen = tm->GetManagementObject()->writePropertiesSize();
- agent->addObject(tm->GetManagementObject(), i + 1);
- tmv.push_back(tm);
- }
-
- // wait for the objects to be published
- Message m1;
- uint32_t msgCount = 0;
- while(r1.fetch(m1, Duration::SECOND * 6)) {
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- msgCount += objs.size();
- }
-
- BOOST_CHECK_EQUAL(msgCount, objCount);
-
- // destroy some of the objects, then immediately export (before the next poll cycle)
-
- uint32_t delCount = 0;
- for (size_t i = 0; i < objCount; i += 2) {
- tmv[i]->GetManagementObject()->resourceDestroy();
- delCount++;
- }
-
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK_EQUAL(delObjs.size(), delCount);
-
- // destroy the broker, and reinistantiate a new one without populating it
- // with TestObjects.
-
- r1.close();
- delete fix;
- while (tmv.size()) {
- delete tmv.back();
- tmv.pop_back();
- }
-
- fix = new AgentFixture(3);
- r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent = fix->getBrokerAgent();
- agent->importDeletedObjects( delObjs );
-
- // wait for the deleted object to be published, verify the count
-
- uint32_t countDels = 0;
- while (r1.fetch(m1, Duration::SECOND * 6)) {
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
-
-
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
-
- TestManageable::validateTestObjectProperties(**oIter);
-
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- countDels++;
- }
- }
-
- // make sure we get the correct # of deleted objects
- BOOST_CHECK_EQUAL(countDels, delCount);
-
- // verify there are no deleted objects to export now.
-
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 0);
-
- r1.close();
- delete fix;
-}
-
-// Verify that we can export and import multiple deleted objects correctly.
-// QMF V2 variant
-QPID_AUTO_TEST_CASE(v2ImportMultiDelObj)
-{
- AgentFixture* fix = new AgentFixture(3, true);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- Receiver r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
-
- // populate the agent with multiple test objects
- const size_t objCount = 50;
- std::vector<TestManageable *> tmv;
-
- for (size_t i = 0; i < objCount; i++) {
- std::stringstream key;
- key << "testobj-" << i;
- TestManageable *tm = new TestManageable(agent, key.str());
- if (tm->GetManagementObject()->writePropertiesSize()) {}
- agent->addObject(tm->GetManagementObject(), key.str());
- tmv.push_back(tm);
- }
-
- // wait for the objects to be published
- Message m1;
- uint32_t msgCount = 0;
- while(r1.fetch(m1, Duration::SECOND * 6)) {
- TestObjectVector objs;
- decodeV2ObjectUpdates(m1, objs);
- msgCount += objs.size();
- }
-
- BOOST_CHECK_EQUAL(msgCount, objCount);
-
- // destroy some of the objects, then immediately export (before the next poll cycle)
-
- uint32_t delCount = 0;
- for (size_t i = 0; i < objCount; i += 2) {
- tmv[i]->GetManagementObject()->resourceDestroy();
- delCount++;
- }
-
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK_EQUAL(delObjs.size(), delCount);
-
- // destroy the broker, and reinistantiate a new one without populating it
- // with TestObjects.
-
- r1.close();
- delete fix;
- while (tmv.size()) {
- delete tmv.back();
- tmv.pop_back();
- }
-
- fix = new AgentFixture(3, true);
- r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent = fix->getBrokerAgent();
- agent->importDeletedObjects( delObjs );
-
- // wait for the deleted object to be published, verify the count
-
- uint32_t countDels = 0;
- while (r1.fetch(m1, Duration::SECOND * 6)) {
- TestObjectVector objs;
- decodeV2ObjectUpdates(m1, objs);
- BOOST_CHECK(objs.size() > 0);
-
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
-
- TestManageable::validateTestObjectProperties(**oIter);
-
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- countDels++;
- }
- }
-
- // make sure we get the correct # of deleted objects
- BOOST_CHECK_EQUAL(countDels, delCount);
-
- // verify there are no deleted objects to export now.
-
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 0);
-
- r1.close();
- delete fix;
-}
-
// See QPID-2997
QPID_AUTO_TEST_CASE(v2RapidRestoreObj)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/CMakeLists.txt?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/CMakeLists.txt Fri Jan 25 18:20:39 2013
@@ -314,6 +314,7 @@ if (PYTHON_EXECUTABLE)
add_test (ha_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py)
add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix})
+ add_test (federation_sys_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_sys_tests${test_script_suffix})
if (BUILD_ACL)
add_test (acl_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_acl_tests${test_script_suffix})
endif (BUILD_ACL)
@@ -340,7 +341,12 @@ add_library (dlclose_noop MODULE dlclose
#
## Longer running stability tests, not run by default check: target.
## Not run under valgrind, too slow
-#LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak
+#LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest
#EXTRA_DIST+=$(LONG_TESTS) run_perftest
#check-long:
# $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
+
+#
+# legacystore
+#
+add_subdirectory(legacystore)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Makefile.am?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Makefile.am Fri Jan 25 18:20:39 2013
@@ -108,8 +108,6 @@ unit_test_SOURCES= unit_test.cpp unit_te
TopicExchangeTest.cpp \
TxBufferTest.cpp \
ConnectionOptions.h \
- ForkedBroker.h \
- ForkedBroker.cpp \
ManagementTest.cpp \
MessageReplayTracker.cpp \
ConsoleTest.cpp \
@@ -242,11 +240,6 @@ header_test_INCLUDES=$(PUBLIC_INCLUDES)
header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
header_test_LDADD=$(lib_client)
-check_PROGRAMS+=failover_soak
-failover_soak_INCLUDES=$(PUBLIC_INCLUDES)
-failover_soak_SOURCES=failover_soak.cpp ForkedBroker.h ForkedBroker.cpp
-failover_soak_LDADD=$(lib_client) $(lib_broker)
-
check_PROGRAMS+=declare_queues
declare_queues_INCLUDES=$(PUBLIC_INCLUDES)
declare_queues_SOURCES=declare_queues.cpp
@@ -319,7 +312,6 @@ EXTRA_DIST += \
ssl_test \
ping_broker \
config.null \
- cpg_check.sh.in \
run_federation_tests \
run_federation_sys_tests \
run_long_federation_sys_tests \
@@ -352,6 +344,7 @@ EXTRA_DIST += \
run_ha_tests \
ha_test.py \
ha_tests.py \
+ brokertest.py \
ha_store_tests.py \
test_env.ps1.in
@@ -375,8 +368,6 @@ EXTRA_DIST+= \
shared_perftest \
multiq_perftest \
topic_perftest \
- run_failover_soak \
- federated_cluster_test_with_node_failure \
sasl_test_setup.sh \
run_msg_group_tests_soak \
qpidd-empty.conf
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/MessageUtils.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/MessageUtils.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/MessageUtils.h Fri Jan 25 18:20:39 2013
@@ -1,3 +1,6 @@
+#ifndef TESTS_MESSAGEUTILS_H
+#define TESTS_MESSAGEUTILS_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -98,3 +101,5 @@ struct MessageUtils
};
}} // namespace qpid::tests
+
+#endif /*!TESTS_MESSAGEUTILS_H*/
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/QueueTest.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/QueueTest.cpp Fri Jan 25 18:20:39 2013
@@ -40,6 +40,7 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
+#include "qpid/sys/Timer.h"
#include <iostream>
#include <vector>
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Variant.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Variant.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Variant.cpp Fri Jan 25 18:20:39 2013
@@ -135,6 +135,16 @@ QPID_AUTO_TEST_CASE(testConversionsFromS
BOOST_CHECK_EQUAL(0, value.asInt16());
BOOST_CHECK_EQUAL(0u, value.asUint16());
+ value = "-Blah";
+ BOOST_CHECK_THROW(value.asUint16(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asInt16(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asUint32(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asInt32(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asUint64(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asInt64(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asFloat(), InvalidConversion);
+ BOOST_CHECK_THROW(value.asDouble(), InvalidConversion);
+
value = "-000";
BOOST_CHECK_EQUAL(0, value.asInt16());
BOOST_CHECK_EQUAL(0u, value.asUint16());
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/brokertest.py?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/brokertest.py Fri Jan 25 18:20:39 2013
@@ -17,8 +17,7 @@
# under the License.
#
-# Support library for tests that start multiple brokers, e.g. cluster
-# or federation
+# Support library for tests that start multiple brokers, e.g. HA or federation
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
@@ -382,8 +381,7 @@ class Broker(Popen):
if not retry(self.log_ready, timeout=timeout):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
- # Create a connection and a session. For a cluster broker this will
- # return after cluster init has finished.
+ # Create a connection and a session.
try:
c = self.connect(**kwargs)
try: c.session()
@@ -391,54 +389,6 @@ class Broker(Popen):
except Exception,e: raise RethrownException(
"Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
- def store_state(self):
- f = open(os.path.join(self.datadir, "cluster", "store.status"))
- try: uuids = f.readlines()
- finally: f.close()
- null_uuid="00000000-0000-0000-0000-000000000000\n"
- if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
- if uuids[0] == null_uuid: return "empty"
- if uuids[1] == null_uuid: return "dirty"
- return "clean"
-
-class Cluster:
- """A cluster of brokers in a test."""
- # Client connection options for use in failover tests.
- CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
-
- _cluster_count = 0
-
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
- self.test = test
- self._brokers=[]
- self.name = "cluster%d" % Cluster._cluster_count
- Cluster._cluster_count += 1
- # Use unique cluster name
- self.args = copy(args)
- self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
- self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
- assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
- self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
-
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
- """Add a broker to the cluster. Returns the index of the new broker."""
- if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
- return self._brokers[-1]
-
- def ready(self, timeout=30, **kwargs):
- for b in self: b.ready(**kwargs)
-
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
- for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
-
- # Behave like a list of brokers.
- def __len__(self): return len(self._brokers)
- def __getitem__(self,index): return self._brokers[index]
- def __iter__(self): return self._brokers.__iter__()
-
-
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
@@ -475,7 +425,6 @@ class BrokerTest(TestCase):
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
- cluster_lib = os.getenv("CLUSTER_LIB")
ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
@@ -527,11 +476,6 @@ class BrokerTest(TestCase):
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
- """Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
- return cluster
-
def browse(self, *args, **kwargs): browse(*args, **kwargs)
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
@@ -560,13 +504,16 @@ class StoppableThread(Thread):
join(self)
if self.error: raise self.error
+# Options for a client that wants to reconnect automatically.
+RECONNECT_OPTIONS="reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
+
class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
def __init__(self, broker, max_depth=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS,
+ connection_options=RECONNECT_OPTIONS,
failover_updates=True, url=None, args=[]):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
@@ -629,7 +576,7 @@ class NumberedReceiver(Thread):
sequentially numbered messages.
"""
def __init__(self, broker, sender=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS,
+ connection_options=RECONNECT_OPTIONS,
failover_updates=True, url=None):
"""
sender: enable flow control. Call sender.received(n) for each message received.
@@ -678,31 +625,6 @@ class NumberedReceiver(Thread):
join(self)
self.check()
-class ErrorGenerator(StoppableThread):
- """
- Thread that continuously generates errors by trying to consume from
- a non-existent queue. For cluster regression tests, error handling
- caused issues in the past.
- """
-
- def __init__(self, broker):
- StoppableThread.__init__(self)
- self.broker=broker
- broker.test.cleanup_stop(self)
- self.start()
-
- def run(self):
- c = self.broker.connect_old()
- try:
- while not self.stopped:
- try:
- c.session(str(qpid.datatypes.uuid4())).message_subscribe(
- queue="non-existent-queue")
- assert(False)
- except qpid.session.SessionException: pass
- time.sleep(0.01)
- except: pass # Normal if broker is killed.
-
def import_script(path):
"""
Import executable script at path as a module.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org