You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/01/03 23:06:56 UTC
svn commit: r1428634 - /qpid/trunk/qpid/cpp/src/tests/brokertest.py
Author: aconway
Date: Thu Jan 3 22:06:56 2013
New Revision: 1428634
URL: http://svn.apache.org/viewvc?rev=1428634&view=rev
Log:
QPID-4514: Remove obsolete cluster code: brokertest.py
Clean up cluster obsolete code in brokertest.py.
Modified:
qpid/trunk/qpid/cpp/src/tests/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1428634&r1=1428633&r2=1428634&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Thu Jan 3 22:06:56 2013
@@ -17,8 +17,7 @@
# under the License.
#
-# Support library for tests that start multiple brokers, e.g. cluster
-# or federation
+# Support library for tests that start multiple brokers, e.g. HA or federation
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
@@ -382,8 +381,7 @@ class Broker(Popen):
if not retry(self.log_ready, timeout=timeout):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
- # Create a connection and a session. For a cluster broker this will
- # return after cluster init has finished.
+ # Create a connection and a session.
try:
c = self.connect(**kwargs)
try: c.session()
@@ -391,54 +389,6 @@ class Broker(Popen):
except Exception,e: raise RethrownException(
"Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
- def store_state(self):
- f = open(os.path.join(self.datadir, "cluster", "store.status"))
- try: uuids = f.readlines()
- finally: f.close()
- null_uuid="00000000-0000-0000-0000-000000000000\n"
- if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
- if uuids[0] == null_uuid: return "empty"
- if uuids[1] == null_uuid: return "dirty"
- return "clean"
-
-class Cluster:
- """A cluster of brokers in a test."""
- # Client connection options for use in failover tests.
- CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
-
- _cluster_count = 0
-
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
- self.test = test
- self._brokers=[]
- self.name = "cluster%d" % Cluster._cluster_count
- Cluster._cluster_count += 1
- # Use unique cluster name
- self.args = copy(args)
- self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
- self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
- assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
- self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
-
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
- """Add a broker to the cluster. Returns the index of the new broker."""
- if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
- return self._brokers[-1]
-
- def ready(self, timeout=30, **kwargs):
- for b in self: b.ready(**kwargs)
-
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
- for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
-
- # Behave like a list of brokers.
- def __len__(self): return len(self._brokers)
- def __getitem__(self,index): return self._brokers[index]
- def __iter__(self): return self._brokers.__iter__()
-
-
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
@@ -475,7 +425,6 @@ class BrokerTest(TestCase):
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
- cluster_lib = os.getenv("CLUSTER_LIB")
ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
@@ -527,11 +476,6 @@ class BrokerTest(TestCase):
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
- """Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
- return cluster
-
def browse(self, *args, **kwargs): browse(*args, **kwargs)
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
@@ -560,13 +504,16 @@ class StoppableThread(Thread):
join(self)
if self.error: raise self.error
+# Options for a client that wants to reconnect automatically.
+RECONNECT_OPTIONS="reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
+
class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
def __init__(self, broker, max_depth=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS,
+ connection_options=RECONNECT_OPTIONS,
failover_updates=True, url=None, args=[]):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
@@ -629,7 +576,7 @@ class NumberedReceiver(Thread):
sequentially numbered messages.
"""
def __init__(self, broker, sender=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS,
+ connection_options=RECONNECT_OPTIONS,
failover_updates=True, url=None):
"""
sender: enable flow control. Call sender.received(n) for each message received.
@@ -678,31 +625,6 @@ class NumberedReceiver(Thread):
join(self)
self.check()
-class ErrorGenerator(StoppableThread):
- """
- Thread that continuously generates errors by trying to consume from
- a non-existent queue. For cluster regression tests, error handling
- caused issues in the past.
- """
-
- def __init__(self, broker):
- StoppableThread.__init__(self)
- self.broker=broker
- broker.test.cleanup_stop(self)
- self.start()
-
- def run(self):
- c = self.broker.connect_old()
- try:
- while not self.stopped:
- try:
- c.session(str(qpid.datatypes.uuid4())).message_subscribe(
- queue="non-existent-queue")
- assert(False)
- except qpid.session.SessionException: pass
- time.sleep(0.01)
- except: pass # Normal if broker is killed.
-
def import_script(path):
"""
Import executable script at path as a module.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org