You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/05/15 18:27:08 UTC

svn commit: r1594967 - in /qpid/dispatch/trunk: src/waypoint_private.h tests/CMakeLists.txt tests/system_test.py tests/system_tests_broker.py tools/qdtest.in

Author: aconway
Date: Thu May 15 16:27:08 2014
New Revision: 1594967

URL: http://svn.apache.org/r1594967
Log:
DISPATCH-52: Integrate system_tests_broker, fix outstanding issues.

system_tests_broker:
- Finish 3 node simple workqueue test, passing. Add to qdtest.sh.
- Skip tests if requirements (qpidd, qpid_messaging etc.) not available.
- Start brokers in setUpClass, re-use for all tests (currently only 1)

system_test:
- Add auto-flush to Messenger
- Support for setUpClass, tearDownClass including workaround for python 2.6

Developer doc:
- Add overview description of waypoints

Ran pylint on both

Modified:
    qpid/dispatch/trunk/src/waypoint_private.h
    qpid/dispatch/trunk/tests/CMakeLists.txt
    qpid/dispatch/trunk/tests/system_test.py
    qpid/dispatch/trunk/tests/system_tests_broker.py
    qpid/dispatch/trunk/tools/qdtest.in

Modified: qpid/dispatch/trunk/src/waypoint_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint_private.h?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint_private.h (original)
+++ qpid/dispatch/trunk/src/waypoint_private.h Thu May 15 16:27:08 2014
@@ -25,8 +25,23 @@
 
 /**
  * @file
- * A waypoint is a point on a multi-phase route where messages can exit and re-enter the router.
- * For example after being sent through an external broker's queue.
+ *
+ * A waypoint sends/receives messages to/from an external entity such as a
+ * broker as part of a multi-phase address.
+ *
+ * An address can have multiple phases. Each phase acts like a separate address,
+ * but sharing the same address string.
+ *
+ * Phases are not visible to normal senders/receivers, they are set by
+ * waypoints. Messages from normal senders go to the phase=0 address.  Normal
+ * subscribers subscribe to the highest phase defined for the address.
+ *
+ * A waypoint takes messages for its in-phase and sends them to the external
+ * entity. Messages received from the external entity are given the waypoint's
+ * out-phase. Waypoints can be "chained" with the out-phase of one equal to the
+ * in-phase for the next. Thus waypoints provide a way to route messages via
+ * multiple external entities between a sender and a subscriber using the same
+ * address.
  */
 
 void qd_waypoint_activate_all(qd_dispatch_t *qd);

Modified: qpid/dispatch/trunk/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/CMakeLists.txt?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/tests/CMakeLists.txt Thu May 15 16:27:08 2014
@@ -53,7 +53,7 @@ add_test(unit_tests_size_1     unit_test
 add_test(unit_tests            unit_tests ${CMAKE_CURRENT_SOURCE_DIR}/threads4.conf)
 add_test(router_tests          python ${CMAKE_CURRENT_SOURCE_DIR}/router_engine_test.py -v)
 
-set(SYSTEM_TEST_FILES system_tests_one_router.py system_tests_two_routers.py)
+set(SYSTEM_TEST_FILES system_test.py system_tests_one_router.py system_tests_two_routers.py system_tests_broker.py)
 
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config_build.sh.in ${CMAKE_CURRENT_BINARY_DIR}/config_build.sh)
 

Modified: qpid/dispatch/trunk/tests/system_test.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Thu May 15 16:27:08 2014
@@ -26,9 +26,7 @@ Features:
 - Tools to manipulate qpidd and qdrouter configuration files.
 - Sundry other tools.
 
-FIXME aconway 2014-03-27: we need to check what is installed & skip tests that can't be run.
-
-Current we assume the following are installed: 
+Requires the following:
  - proton with python bindings
  - qpidd with AMQP 1.0 support
  - qpidtoollibs python module from qpid/tools
@@ -40,12 +38,12 @@ You can set this up from packages on fed
 
 Here's how to build from source assuming you use default install prefix /usr/local
 
+With a  qpid-proton checkout at $PROTON
+ cd $PROTON/<build-directory>; make install
 With a qpid checkout at $QPID:
  cd $QPID/qpid/cpp/<build-directory>; make install
  cd $QPID/qpid/tools; ./setup.py install --prefix /usr/local
  cd $QPID/qpid/python; ./setup.py install --prefix /usr/local
-With a  qpid-proton checkout at $PROTON
- cd $PROTON/<build-directory>; make install
 
 And finally make sure to set up your environment:
 
@@ -54,27 +52,70 @@ export PYTHONPATH="$PYTHONPATH:/usr/loca
 export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib64"
 """
 
-import sys, os, time, socket, random
-import subprocess, tempfile, shutil
-import unittest
-import qpidtoollibs
-import qpid_messaging as qm
-import proton
-from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
+import os, time, socket, random, subprocess, shutil, unittest
 from copy import copy
+import proton
+from proton import Message
+
+# Optional modules
+MISSING_MODULES = []
+
+try:
+    import qpidtoollibs
+except ImportError, err:
+    qpidtoollibs = None         # pylint: disable=invalid-name
+    MISSING_MODULES.append(str(err))
+
+try:
+    import qpid_messaging as qm
+except ImportError, err:
+    qm = None                   # pylint: disable=invalid-name
+    MISSING_MODULES.append(str(err))
+
+def find_exe(program):
+    """Find an executable in the system PATH"""
+    def is_exe(fpath):
+        """True if fpath is executable"""
+        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
+    mydir = os.path.split(program)[0]
+    if mydir:
+        if is_exe(program):
+            return program
+    else:
+        for path in os.environ["PATH"].split(os.pathsep):
+            exe_file = os.path.join(path, program)
+            if is_exe(exe_file):
+                return exe_file
+    return None
+
+
+def _check_requirements():
+    """If requirements are missing, return a message, else return empty string."""
+    missing = MISSING_MODULES
+    required_exes = ['qpidd', 'qdrouterd']
+    missing += ["No exectuable  %s"%e for e in required_exes if not find_exe(e)]
+    if find_exe('qpidd'):
+        p = subprocess.Popen(['qpidd', '--help'], stdout=subprocess.PIPE)
+        if not "AMQP 1.0" in p.communicate()[0]:
+            missing.append("No AMQP 1.0 support in qpidd")
+    if missing:
+        return "%s: %s"%(__name__, ", ".join(missing))
+
+MISSING_REQUIREMENTS = _check_requirements()
 
-def retry_delay(deadline, timeout, delay, max_delay):
+def retry_delay(deadline, delay, max_delay):
     """For internal use in retry. Sleep as required
     and return the new delay or None if retry should time out"""
     remaining = deadline - time.time()
-    if remaining <= 0: return None
+    if remaining <= 0:
+        return None
     time.sleep(min(delay, remaining))
     return min(delay*2, max_delay)
 
 
-default_timeout=float(os.environ.get("QPID_SYSTEM_TEST_TIMEOUT", 5))
+DEFAULT_TIMEOUT = float(os.environ.get("QPID_SYSTEM_TEST_TIMEOUT", 5))
 
-def retry(function, timeout=default_timeout, delay=.001, max_delay=1):
+def retry(function, timeout=DEFAULT_TIMEOUT, delay=.001, max_delay=1):
     """Call function until it returns a true value or timeout expires.
     Double the delay for each retry up to max_delay.
     Returns what function returns or None if timeout expires.
@@ -85,10 +126,11 @@ def retry(function, timeout=default_time
         if ret:
             return ret
         else:
-            delay = retry_delay(deadline, timeout, delay, max_delay)
-            if delay is None: return None
+            delay = retry_delay(deadline, delay, max_delay)
+            if delay is None:
+                return None
 
-def retry_exception(function, timeout=default_timeout, delay=.001, max_delay=1, exception_test=None):
+def retry_exception(function, timeout=DEFAULT_TIMEOUT, delay=.001, max_delay=1, exception_test=None):
     """Call function until it returns without exception or timeout expires.
     Double the delay for each retry up to max_delay.
     Calls exception_test with any exception raised by function, exception_test
@@ -100,10 +142,12 @@ def retry_exception(function, timeout=de
     while True:
         try:
             return function()
-        except Exception, e:
-            if exception_test: exception_test(e)
-            delay = retry_delay(deadline, timeout, delay, max_delay)
-            if delay is None: raise
+        except Exception, e:    # pylint: disable=broad-except
+            if exception_test:
+                exception_test(e)
+            delay = retry_delay(deadline, delay, max_delay)
+            if delay is None:
+                raise
 
 def port_available(port, host='0.0.0.0'):
     """Return true if connecting to host:port gives 'connection refused'."""
@@ -113,17 +157,21 @@ def port_available(port, host='0.0.0.0')
         s.close()
     except socket.error, e:
         return e.errno == 111
-    except: pass
+    except:
+        pass
     return False
 
 def wait_port(port, host='0.0.0.0', **retry_kwargs):
     """Wait up to timeout for port (on host) to be connectable.
     Takes same keyword arguments as retry to control the timeout"""
-    def check(e):               # Only retry on connection refused
-        if not isinstance(e, socket.error) or not e.errno == 111: raise
+    def check(e):
+        """Only retry on connection refused"""
+        if not isinstance(e, socket.error) or not e.errno == 111:
+            raise
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    try: retry_exception(lambda: s.connect((host, port)), exception_test=check,
-                         **retry_kwargs)
+    try:
+        retry_exception(lambda: s.connect((host, port)), exception_test=check,
+                        **retry_kwargs)
     except Exception, e:
         raise Exception("wait_port timeout on %s:%s: %s"%(host, port, e))
 
@@ -132,21 +180,23 @@ def wait_port(port, host='0.0.0.0', **re
 def wait_ports(ports, host="127.0.0.1", **retry_kwargs):
     """Wait up to timeout for all ports (on host) to be connectable.
     Takes same keyword arguments as retry to control the timeout"""
-    for p in ports: wait_port(p)
+    for p in ports:
+        wait_port(p, host=host, **retry_kwargs)
 
 def message(**properties):
     """Convenience to create a proton.Message with properties set"""
     m = Message()
-    for name, value in properties.iteritems(): setattr(m, name, value)
+    for name, value in properties.iteritems():
+        setattr(m, name, value)
     return m
 
 class Process(subprocess.Popen):
     """Popen that can be torn down at the end of a TestCase and stores its output."""
 
     # Expected states of a Process at teardown
-    RUNNING=1                   # Still running
-    EXIT_OK=2                   # Exit status 0
-    EXIT_FAIL=3                 # Exit status not 0
+    RUNNING = 1                   # Still running
+    EXIT_OK = 2                   # Exit status 0
+    EXIT_FAIL = 3                 # Exit status not 0
 
     def __init__(self, name, args, expect=EXIT_OK, **kwargs):
         self.name, self.args, self.expect = name, args, expect
@@ -155,24 +205,36 @@ class Process(subprocess.Popen):
         super(Process, self).__init__(
             args, stdout=self.out, stderr=subprocess.STDOUT, **kwargs)
 
-    def assert_running(self): assert self.poll() is None, "%s exited"%name
+    def assert_running(self):
+        """Assert that the proces is still running"""
+        assert self.poll() is None, "%s exited" % self.name
 
     def teardown(self):
-        if self.torndown: return
+        """Check process status and stop the process if necessary"""
+        if self.torndown:
+            return
         self.torndown = True
         status = self.poll()
-        if status is None: self.kill()
+        if status is None:
+            self.kill()
         self.out.close()
         self.check_exit(status)
 
     def check_exit(self, status):
+        """Check process exit status"""
         def check(condition, expect):
-            if status is None: actual="still running"
-            else: actual="exit %s"%status
+            """assert condition with a suitable message for status"""
+            if status is None:
+                actual = "still running"
+            else:
+                actual = "exit %s"%status
             assert condition, "Expected %s but %s: %s"%(expect, actual, self.name)
-        if self.expect == Process.RUNNING: check(status is None, "still running"),
-        elif self.expect == Process.EXIT_OK: check(status == 0, "exit 0"),
-        elif self.expect == Process.EXIT_FAIL: check(status != 0, "exit non-0")
+        if self.expect == Process.RUNNING:
+            check(status is None, "still running")
+        elif self.expect == Process.EXIT_OK:
+            check(status == 0, "exit 0")
+        elif self.expect == Process.EXIT_FAIL:
+            check(status != 0, "exit non-0")
 
 class Config(object):
     """Base class for configuration objects that provide a convenient
@@ -181,7 +243,8 @@ class Config(object):
     def write(self, name, suffix=".conf"):
         """Write the config object to file name.suffix. Returns name.suffix."""
         name = name+suffix
-        with open(name,'w') as f: f.write(str(self))
+        with open(name, 'w') as f:
+            f.write(str(self))
         return name
 
 
@@ -189,30 +252,32 @@ class Qdrouterd(Process):
     """Run a Qpid Dispatch Router Daemon"""
 
     class Config(list, Config):
-        """List of ('section', {'name':'value',...}).
+        """List of ('section', {'name':'value', ...}).
         Fills in some default values automatically, see Qdrouterd.DEFAULTS
         """
 
         DEFAULTS = {
             'listener':{'sasl-mechanisms':'ANONYMOUS'},
-            'connector':{'sasl-mechanisms':'ANONYMOUS','role':'on-demand'}
+            'connector':{'sasl-mechanisms':'ANONYMOUS', 'role':'on-demand'}
         }
 
         def sections(self, name):
             """Return list of sections named name"""
-            return [p for n,p in self if n == name]
-
-        def _defs(self, name, props):
-            """Fill in defaults for required values"""
-            if not name in Qdrouterd.Config.DEFAULTS: return props
-            p = copy(Qdrouterd.Config.DEFAULTS[name])
-            p.update(props);
-            return p
+            return [p for n, p in self if n == name]
 
         def __str__(self):
             """Generate config file content. Fills in defaults for some require values"""
-            def props(p): return "".join(["    %s: %s\n"%(k,v) for k,v in p.iteritems()])
-            return "".join(["%s {\n%s}\n"%(n,props(self._defs(n,p))) for n,p in self])
+            def defs(name, props):
+                """Fill in defaults for required values"""
+                if not name in Qdrouterd.Config.DEFAULTS:
+                    return props
+                p = copy(Qdrouterd.Config.DEFAULTS[name])
+                p.update(props)
+                return p
+            def props(p):
+                """qpidd.conf format of dict p"""
+                return "".join(["    %s: %s\n"%(k, v) for k, v in p.iteritems()])
+            return "".join(["%s {\n%s}\n"%(n, props(defs(n, p))) for n, p in self])
 
     class Agent(object):
         """Management agent"""
@@ -224,19 +289,19 @@ class Qdrouterd(Process):
             self.subscription = self.messenger.subscribe("amqp:/#")
             self.reply_to = self.subscription.address
 
-        def stop(self): self.messenger.stop()
-
-        def get(self, type):
-            """Return a list of attribute dicts for each instance of type"""
-            request = message(address=self.address, reply_to=self.reply_to,
-                              correlation_id=1,
-                              properties={u'operation':u'QUERY', u'entityType':type},
-                              body={'attributeNames':[]})
-            response = Message()
+        def stop(self):
+            """Stop the agent's messenger"""
+            self.messenger.stop()
+
+        def get(self, entity_type):
+            """Return a list of attribute dicts for each instance of entity_type"""
+            request = message(
+                address=self.address, reply_to=self.reply_to,
+                correlation_id=1,
+                properties={u'operation':u'QUERY', u'entityType':entity_type},
+                body={'attributeNames':[]})
             self.messenger.put(request)
-            self.messenger.send()
-            self.messenger.recv(1)
-            self.messenger.get(response)
+            response = self.messenger.fetch()
             if response.properties['statusCode'] != 200:
                 raise Exception("Agent error: %d %s" % (
                     response.properties['statusCode'],
@@ -245,7 +310,7 @@ class Qdrouterd(Process):
             return [dict(zip(attrs, values)) for values in response.body['results']]
 
 
-    def __init__(self, name, config, **kwargs):
+    def __init__(self, name, config=Config()):
         self.config = copy(config)
         super(Qdrouterd, self).__init__(
             name, ['qdrouterd', '-c', config.write(name)], expect=Process.RUNNING)
@@ -253,29 +318,33 @@ class Qdrouterd(Process):
 
     @property
     def agent(self):
-        if not self._agent: self._agent = self.Agent(self)
+        """Return an management Agent for this router"""
+        if not self._agent:
+            self._agent = self.Agent(self)
         return self._agent
 
     def teardown(self):
-        if self._agent: self._agent.stop()
+        if self._agent:
+            self._agent.stop()
         super(Qdrouterd, self).teardown()
 
     @property
     def ports(self):
         """Return list of configured ports for all listeners"""
-        return [ l['port'] for l in self.config.sections('listener') ]
+        return [l['port'] for l in self.config.sections('listener')]
 
     @property
     def addresses(self):
         """Return amqp://host:port addresses for all listeners"""
-        return [ "amqp://%s:%s"%(l['addr'],l['port']) for l in self.config.sections('listener') ]
+        return ["amqp://%s:%s"%(l['addr'], l['port']) for l in self.config.sections('listener')]
 
     def is_connected(self, port, host='0.0.0.0'):
         """If router has a connection to host:port return the management info.
         Otherwise return None"""
         connections = self.agent.get('org.apache.qpid.dispatch.connection')
         for c in connections:
-            if c['name'] == '%s:%s'%(host, port): return c
+            if c['name'] == '%s:%s'%(host, port):
+                return c
         return None
 
 
@@ -283,17 +352,18 @@ class Qpidd(Process):
     """Run a Qpid Daemon"""
 
     class Config(dict, Config):
-
+        """qpidd.conf contents. Use like  a dict, str() generates qpidd.conf format"""
         def __str__(self):
-            return "".join(["%s=%s\n"%(k,v) for k,v in self.iteritems()])
-        
+            return "".join(["%s=%s\n"%(k, v) for k, v in self.iteritems()])
 
-    def __init__(self, name, config):
+    def __init__(self, name, config=Config(), port=None):
         self.config = Qpidd.Config(
             {'auth':'no',
              'log-to-stderr':'false', 'log-to-file':name+".log",
              'data-dir':name+".data"})
         self.config.update(config)
+        if port:
+            self.config['port'] = port
         super(Qpidd, self).__init__(
             name, ['qpidd', '--config', self.config.write(name)], expect=Process.RUNNING)
         self.port = self.config['port'] or 5672
@@ -302,71 +372,100 @@ class Qpidd(Process):
 
     def qm_connect(self):
         """Make a qpid_messaging connection to the broker"""
+        if not qm:
+            raise Exception("No qpid_messaging module available")
         return qm.Connection.establish(self.address)
 
     @property
     def agent(self, **kwargs):
-        if not self._agent: self._agent = qpidtoollibs.BrokerAgent(self.qm_connect())
+        """Get the management agent for this broker"""
+        if not qpidtoollibs:
+            raise Exception("No qpidtoollibs module available")
+        if not self._agent:
+            self._agent = qpidtoollibs.BrokerAgent(self.qm_connect(), **kwargs)
         return self._agent
 
 
 
-class Messenger(proton.Messenger):
-    """Minor additions to Messenger for tests"""
+# Decorator to add an optional flush argument to a method, defaulting to
+# the _flush value for the messenger.
+def flush_arg(method):
+    """Decorator for Messenger methods that adds an optional flush argument,
+    defaulting to the Messenger default"""
+    def wrapper(self, *args, **kwargs):
+        """Wrapper that adds flush argument"""
+        flush = self._flush # pylint: disable=protected-access
+        if 'flush' in kwargs:
+            flush = kwargs['flush']
+            del kwargs['flush']
+        r = method(self, *args, **kwargs)
+        if flush:
+            self.flush()
+        return r
+    return wrapper
+
+class Messenger(proton.Messenger): # pylint: disable=too-many-public-methods
+    """Convenience additions to proton.Messenger"""
+
+    def __init__(self, name=None, timeout=DEFAULT_TIMEOUT, blocking=True, flush=False):
+        super(Messenger, self).__init__(name)
+        self.timeout = timeout
+        self.blocking = blocking
+        self._flush = flush
 
     def flush(self):
         """Call work() till there is no work left."""
-        while self.work(0.01): pass
+        while self.work(0.01):
+            pass
 
-    def subscribe(self, source):
-        """proton.Messenger.subscribe and work till subscription is visible."""
-        t = proton.Messenger.subscribe(self, source)
-        self.flush()
-        return t
-
-class TestCase(unittest.TestCase):
-    """A test case that creates a separate directory for each test and
-    cleans up during teardown."""
+    @flush_arg
+    def fetch(self, accept=True):
+        """Fetch a single message"""
+        msg = Message()
+        self.recv(1)
+        self.get(msg)
+        if accept:
+            self.accept()
+        return msg
+
+    put = flush_arg(proton.Messenger.put)
+    subscribe = flush_arg(proton.Messenger.subscribe)
+
+class Tester(object):
+    """Tools for use by TestCase
+- Create a directory for the test.
+- Utilities to create processes and servers, manage ports etc.
+- Clean up processes on teardown"""
 
     def __init__(self, *args, **kwargs):
-        super(TestCase, self).__init__(*args, **kwargs)
-        self.save_dir = os.getcwd()
-        # self.id() is normally _module[.module].TestClass.test_name
-        id = self.id().split(".")
-        if len(id) == 1:        # Not the expected format, just use dir = id.
-            dir = id[0]
-        else:                   # use dir = module[.module].TestClass/test_name
-            dir = os.path.join(".".join(id[0:-1]), id[-1])
-        shutil.rmtree(dir, ignore_errors=True)
-        os.makedirs(dir)
-        os.chdir(dir)
         self.cleanup_list = []
-        self.port_range = (20000, 30000)
-        self.next_port = random.randint(*self.port_range)
+        self.save_dir = None
+        self.directory = None
 
-    def tearDown(self):
-        os.chdir(self.save_dir)
+    def setup(self, directory):
+        """Create directory"""
+        self.directory = directory
+        shutil.rmtree(directory, ignore_errors=True)
+        os.makedirs(directory)
+        self.save_dir = os.getcwd()
+        os.chdir(directory)
+
+    def teardown(self):
+        """Clean up (tear-down, stop or close) objects recorded via cleanup()"""
         self.cleanup_list.reverse()
         for t in self.cleanup_list:
             for m in ["teardown", "tearDown", "stop", "close"]:
                 a = getattr(t, m, None)
-                if a: a(); break
-
-    def cleanup(self, x): self.cleanup_list.append(x); return x
+                if a:
+                    a()
+                    break
+        os.chdir(self.save_dir)
 
-    def get_port(self):
-        """Get an unused port"""
-        def advance():          # Advance with wrap-around
-            self.next_port += 1
-            if self.next_port >= self.port_range[1]: self.next_port = port_range[0]
-        start = self.next_port
-        while not port_available(self.next_port):
-            advance()
-            if self.next_port == start:
-                raise Exception("No avaliable ports in range %s", self.port_range)
-        p = self.next_port;
-        advance()
-        return p
+    def cleanup(self, x):
+        """Record object x for clean-up during tear-down.
+        x should have on of the methods teardown, tearDown, stop or close"""
+        self.cleanup_list.append(x)
+        return x
 
     def popen(self, *args, **kwargs):
         """Start a Process that will be cleaned up on teardown"""
@@ -380,16 +479,78 @@ class TestCase(unittest.TestCase):
         """Return a Qpidd that will be cleaned up on teardown"""
         return self.cleanup(Qpidd(*args, **kwargs))
 
-    def messenger(self, name="test-messenger", timeout=default_timeout, blocking=True, cleanup=True):
+    def messenger(self, name=None, cleanup=True, **kwargs):
         """Return a started Messenger that will be cleaned up on teardown."""
-        m = Messenger(name)
-        m.timeout = timeout
-        m.blocking = blocking
+        m = Messenger(name or os.path.basename(self.directory), **kwargs)
         m.start()
-        if cleanup: self.cleanup(m)
+        if cleanup:
+            self.cleanup(m)
         return m
 
-    def message(self, **properties):
-        """Convenience to create a proton.Message with properties set"""
-        global message
-        return message(**properties)
+    port_range = (20000, 30000)
+    next_port = random.randint(port_range[0], port_range[1])
+
+    @classmethod
+    def get_port(cls):
+        """Get an unused port"""
+        def advance():
+            """Advance with wrap-around"""
+            cls.next_port += 1
+            if cls.next_port >= cls.port_range[1]:
+                cls.next_port = cls.port_range[0]
+        start = cls.next_port
+        while not port_available(cls.next_port):
+            advance()
+            if cls.next_port == start:
+                raise Exception("No avaliable ports in range %s", cls.port_range)
+        p = cls.next_port
+        advance()
+        return p
+
+
+class TestCase(unittest.TestCase, Tester): # pylint: disable=too-many-public-methods
+    """A TestCase that sets up its own working directory and is also a Tester."""
+
+    def __init__(self, test_method):
+        unittest.TestCase.__init__(self, test_method)
+        Tester.__init__(self)
+
+    @classmethod
+    def base_dir(cls):
+        return os.path.abspath(os.path.join(__name__, cls.__name__))
+
+    @classmethod
+    def setUpClass(cls):
+        shutil.rmtree(cls.base_dir(), ignore_errors=True) # Clear old test tree.
+        cls.tester = Tester()
+        cls.tester.setup(os.path.join(cls.base_dir(), 'setup_class'))
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.tester.teardown()
+
+    def setUp(self):
+        # self.id() is normally the fully qualified method name
+        Tester.setup(self, os.path.join(self.base_dir(), self.id().split(".")[-1]))
+
+    def tearDown(self):
+        Tester.teardown(self)
+
+    def skipTest(self, reason):
+        """Workaround missing unittest.TestCase.skipTest in python 2.6.
+        The caller must return in order to end the test"""
+        if hasattr(unittest.TestCase, 'skipTest'):
+            self.skipTest(reason)
+        else:
+            print "Skipping test", id(), reason
+
+    # Hack to support setUpClass/tearDownClass on older versions of python.
+    # The default TestLoader sorts tests alphabetically so we insert
+    # fake tests that will run first and last to call the class setup/teardown functions.
+    if not hasattr(unittest.TestCase, 'setUpClass'):
+        def test_0000_setup_class(self):
+            """Fake test to call setUpClass"""
+            self.__class__.setUpClass()
+        def test_zzzz_teardown_class(self):
+            """Fake test to call tearDownClass"""
+            self.__class__.tearDownClass()

Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Thu May 15 16:27:08 2014
@@ -17,70 +17,94 @@
 # under the License.
 #
 
-"""System tests involving one or more brokers and dispatch routers
-
-FIXME aconway 2014-04-29:
-
-These tests is a work in progress, they do not pass
-and they are not run by the qdtest script.
-
-They are provided as an example of how to use the system_test module.
-
-To run the tests from a dispatch checkout:
- . config.sh; python tests/system_tests_broker.py
-Note the tests wil 
 """
+System tests involving one or more brokers and dispatch routers integrated
+with waypoints.
+"""
+import unittest, system_test
+from system_test import wait_port, wait_ports, Qdrouterd, retry, message, MISSING_REQUIREMENTS
 
-from system_test import *
-
-class BrokerSystemTest(TestCase):
-
-    def test_broker(self):
-        testq = 'testq'
+class BrokerSystemTest(system_test.TestCase): # pylint: disable=too-many-public-methods
+    """System tests involving routers and qpidd brokers"""
 
-        # Start two qpidd brokers called qpidd0 and qpidd1
-        qpidd = [
-            self.qpidd('qpidd%s'%i,
-                       Qpidd.Config({'port':self.get_port(), 'trace':1}))
-                  for i in xrange(2) ]
-
-        # FIXME aconway 2014-05-13: router waypoint connection seems fragile
-        # unless everything is set up beforehand.
-        wait_ports([q.port for q in qpidd])
-        qpidd[0].agent.addQueue(testq)
+    # Hack for python 2.6 which does not support setupClass.
+    # We set setup_ok = true in setupClass, and skip all tests if it's not true.
+    setup_ok = False
+
+    @classmethod
+    def setUpClass(cls):
+        """Start 3 qpidd brokers, wait for them to be ready."""
+        super(BrokerSystemTest, cls).setUpClass()
+        cls.qpidd = [cls.tester.qpidd('qpidd%s'%i, port=cls.get_port())
+                    for i in xrange(3)]
+        for q in cls.qpidd:
+            wait_port(q.port)
+        cls.setup_ok = True
+
+    @classmethod
+    def tearDownClass(cls):
+        if cls.setup_ok:
+            cls.setup_ok = False
+            super(BrokerSystemTest, cls).tearDownClass()
+
+    def test_distrbuted_queue(self):
+        """Static distributed queue, one router, three brokers"""
+        if not self.setup_ok:
+            return self.skipTest("setUpClass failed")
+        testq = self.id()       # The distributed queue name
+        for q in self.qpidd:
+            q.agent.addQueue(testq)
 
         # Start a qdrouterd
+        # We have a waypoint for each broker, on the same testq address.
+        # Sending to testq should spread messages to the qpidd queues.
+        # Subscribing to testq should gather messages from the qpidd queues.
         router_conf = Qdrouterd.Config([
-            ('log', { 'module':'DEFAULT', 'level':'NOTICE' }),
-            ('log', { 'module':'ROUTER', 'level':'TRACE' }),
-            ('log', { 'module':'MESSAGE', 'level':'TRACE' }),
+            ('log', {'module':'DEFAULT', 'level':'NOTICE'}),
+            ('log', {'module':'ROUTER', 'level':'TRACE'}),
+            ('log', {'module':'MESSAGE', 'level':'TRACE'}),
             ('container', {'container-name':self.id()}),
             ('container', {'container-name':self.id()}),
-            ('router', { 'mode': 'standalone', 'router-id': self.id() }),
+            ('router', {'mode': 'standalone', 'router-id': self.id()}),
             ('listener', {'addr':'0.0.0.0', 'port':self.get_port()}),
-            ('connector', {'name':'qpidd0', 'addr':'0.0.0.0', 'port':qpidd[0].port}),
-            ('connector', {'name':'qpidd1', 'addr':'0.0.0.0', 'port':qpidd[1].port}),
-            ('fixed-address', {'prefix':'testq', 'phase':0, 'fanout':'single', 'bias':'closest'}),
-            ('fixed-address', {'prefix':'testq', 'phase':1, 'fanout':'single', 'bias':'closest'}),
-            ('waypoint', {'name':'testq', 'out-phase':1, 'in-phase':0, 'connector':'qpidd0'})
+            ('fixed-address', {'prefix':testq, 'phase':0, 'fanout':'single', 'bias':'spread'}),
+            ('fixed-address', {'prefix':testq, 'phase':1, 'fanout':'single', 'bias':'spread'})
         ])
+        # Add connector and waypoint for each broker.
+        for q in self.qpidd:
+            router_conf += [
+                ('connector', {'name':q.name, 'addr':'0.0.0.0', 'port':q.port}),
+                ('waypoint', {'name':testq, 'out-phase':1, 'in-phase':0, 'connector':q.name})]
+
         router = self.qdrouterd('router0', router_conf)
         wait_ports(router.ports)
-        retry(lambda: router.is_connected(qpidd[0].port))
+        for q in self.qpidd:
+            retry(lambda: router.is_connected(q.port))
 
-        # Test for waypoint routing via queue
-        m=self.message(address=router.addresses[0]+"/"+testq, body="FOO")
         msgr = self.messenger()
-        msgr.subscribe(m.address)
-        msgr.put(m)
-        msgr.send()
-        msg = Message()
-        msgr.recv(1)
-        msgr.get(msg)
-        msgr.accept()
-        msgr.flush()
-        self.assertEqual(msg.body, m.body)
-        aq = qpidd[0].agent.getQueue(testq)
-        self.assertEquals((aq.msgTotalEnqueues, aq.msgTotalDequeues), (1,1))
 
-if __name__ == '__main__': unittest.main()
+        address = router.addresses[0]+"/"+testq
+        msgr.subscribe(address, flush=True)
+        n = 20                  # Messages per broker
+        r = range(n*len(self.qpidd))
+        for i in r:
+            msgr.put(message(address=address, body=i))
+        messages = sorted(msgr.fetch().body for i in r)
+        msgr.flush()
+        self.assertEqual(messages, r)
+        # Verify we got back exactly what we sent.
+        qs = [q.agent.getQueue(testq) for q in self.qpidd]
+        enq = sum(q.msgTotalEnqueues for q in qs)
+        deq = sum(q.msgTotalDequeues for q in qs)
+        self.assertEquals((enq, deq), (len(r), len(r)))
+        # Verify the messages were spread equally over the brokers.
+        self.assertEquals(
+            [(q.msgTotalEnqueues, q.msgTotalDequeues) for q in qs],
+            [(n, n) for q in qs]
+        )
+
+if __name__ == '__main__':
+    if MISSING_REQUIREMENTS:
+        print MISSING_REQUIREMENTS
+    else:
+        unittest.main()

Modified: qpid/dispatch/trunk/tools/qdtest.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdtest.in?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdtest.in (original)
+++ qpid/dispatch/trunk/tools/qdtest.in Thu May 15 16:27:08 2014
@@ -33,5 +33,8 @@ python $QPID_DISPATCH_HOME/tests/system_
 echo "Running system_tests_two_routers.py with SSL"
 python $QPID_DISPATCH_HOME/tests/system_tests_two_routers.py -v --ssl
 
+echo "Running system_tests_broker.py"
+python $QPID_DISPATCH_HOME/tests/system_tests_broker.py -v
+
 echo "Running qdstat_test.sh"
 bash $QPID_DISPATCH_HOME/tests/qdstat_test.sh



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org