You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/01/03 23:06:56 UTC

svn commit: r1428634 - /qpid/trunk/qpid/cpp/src/tests/brokertest.py

Author: aconway
Date: Thu Jan  3 22:06:56 2013
New Revision: 1428634

URL: http://svn.apache.org/viewvc?rev=1428634&view=rev
Log:
QPID-4514: Remove obsolete cluster code: brokertest.py

Clean up cluster obsolete code in brokertest.py.

Modified:
    qpid/trunk/qpid/cpp/src/tests/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1428634&r1=1428633&r2=1428634&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Thu Jan  3 22:06:56 2013
@@ -17,8 +17,7 @@
 # under the License.
 #
 
-# Support library for tests that start multiple brokers, e.g. cluster
-# or federation
+# Support library for tests that start multiple brokers, e.g. HA or federation
 
 import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
 import qpid, traceback, signal
@@ -382,8 +381,7 @@ class Broker(Popen):
         if not retry(self.log_ready, timeout=timeout):
             raise Exception(
                 "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
-        # Create a connection and a session. For a cluster broker this will
-        # return after cluster init has finished.
+        # Create a connection and a session.
         try:
             c = self.connect(**kwargs)
             try: c.session()
@@ -391,54 +389,6 @@ class Broker(Popen):
         except Exception,e: raise RethrownException(
             "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
 
-    def store_state(self):
-        f = open(os.path.join(self.datadir, "cluster", "store.status"))
-        try: uuids = f.readlines()
-        finally: f.close()
-        null_uuid="00000000-0000-0000-0000-000000000000\n"
-        if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
-        if uuids[0] == null_uuid: return "empty"
-        if uuids[1] == null_uuid: return "dirty"
-        return "clean"
-
-class Cluster:
-    """A cluster of brokers in a test."""
-    # Client connection options for use in failover tests.
-    CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
-
-    _cluster_count = 0
-
-    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
-        self.test = test
-        self._brokers=[]
-        self.name = "cluster%d" % Cluster._cluster_count
-        Cluster._cluster_count += 1
-        # Use unique cluster name
-        self.args = copy(args)
-        self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
-        self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
-        assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
-        self.args += [ "--load-module", BrokerTest.cluster_lib ]
-        self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
-
-    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
-        """Add a broker to the cluster. Returns the index of the new broker."""
-        if not name: name="%s-%d" % (self.name, len(self._brokers))
-        self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
-        return self._brokers[-1]
-
-    def ready(self,  timeout=30, **kwargs):
-        for b in self: b.ready(**kwargs)
-
-    def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
-        for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
-
-    # Behave like a list of brokers.
-    def __len__(self): return len(self._brokers)
-    def __getitem__(self,index): return self._brokers[index]
-    def __iter__(self): return self._brokers.__iter__()
-
-
 def browse(session, queue, timeout=0, transform=lambda m: m.content):
     """Return a list with the contents of each message on queue."""
     r = session.receiver("%s;{mode:browse}"%(queue))
@@ -475,7 +425,6 @@ class BrokerTest(TestCase):
 
     # Environment settings.
     qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
-    cluster_lib = os.getenv("CLUSTER_LIB")
     ha_lib = os.getenv("HA_LIB")
     xml_lib = os.getenv("XML_LIB")
     qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
@@ -527,11 +476,6 @@ class BrokerTest(TestCase):
                 raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
         return b
 
-    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
-        """Create and return a cluster ready for use"""
-        cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
-        return cluster
-
     def browse(self, *args, **kwargs): browse(*args, **kwargs)
     def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
     def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
@@ -560,13 +504,16 @@ class StoppableThread(Thread):
         join(self)
         if self.error: raise self.error
 
+# Options for a client that wants to reconnect automatically.
+RECONNECT_OPTIONS="reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
+
 class NumberedSender(Thread):
     """
     Thread to run a sender client and send numbered messages until stopped.
     """
 
     def __init__(self, broker, max_depth=None, queue="test-queue",
-                 connection_options=Cluster.CONNECTION_OPTIONS,
+                 connection_options=RECONNECT_OPTIONS,
                  failover_updates=True, url=None, args=[]):
         """
         max_depth: enable flow control, ensure sent - received <= max_depth.
@@ -629,7 +576,7 @@ class NumberedReceiver(Thread):
     sequentially numbered messages.
     """
     def __init__(self, broker, sender=None, queue="test-queue",
-                 connection_options=Cluster.CONNECTION_OPTIONS,
+                 connection_options=RECONNECT_OPTIONS,
                  failover_updates=True, url=None):
         """
         sender: enable flow control. Call sender.received(n) for each message received.
@@ -678,31 +625,6 @@ class NumberedReceiver(Thread):
         join(self)
         self.check()
 
-class ErrorGenerator(StoppableThread):
-    """
-    Thread that continuously generates errors by trying to consume from
-    a non-existent queue. For cluster regression tests, error handling
-    caused issues in the past.
-    """
-
-    def __init__(self, broker):
-        StoppableThread.__init__(self)
-        self.broker=broker
-        broker.test.cleanup_stop(self)
-        self.start()
-
-    def run(self):
-        c = self.broker.connect_old()
-        try:
-            while not self.stopped:
-                try:
-                    c.session(str(qpid.datatypes.uuid4())).message_subscribe(
-                        queue="non-existent-queue")
-                    assert(False)
-                except qpid.session.SessionException: pass
-                time.sleep(0.01)
-        except: pass                    # Normal if broker is killed.
-
 def import_script(path):
     """
     Import executable script at path as a module.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org