You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/05/15 18:27:08 UTC
svn commit: r1594967 - in /qpid/dispatch/trunk: src/waypoint_private.h
tests/CMakeLists.txt tests/system_test.py tests/system_tests_broker.py
tools/qdtest.in
Author: aconway
Date: Thu May 15 16:27:08 2014
New Revision: 1594967
URL: http://svn.apache.org/r1594967
Log:
DISPATCH-52: Integrate system_tests_broker, fix outstanding issues.
system_tests_broker:
- Finish 3 node simple workqueue test, passing. Add to qdtest.sh.
- Skip tests if requirements (qpidd, qpid_messaging etc.) not available.
- Start brokers in setUpClass, re-use for all tests (currently only 1)
system_test:
- Add auto-flush to Messenger
- Support for setUpClass, tearDownClass including workaround for python 2.6
Developer doc:
- Add overview description of waypoints
Ran pylint on both
Modified:
qpid/dispatch/trunk/src/waypoint_private.h
qpid/dispatch/trunk/tests/CMakeLists.txt
qpid/dispatch/trunk/tests/system_test.py
qpid/dispatch/trunk/tests/system_tests_broker.py
qpid/dispatch/trunk/tools/qdtest.in
Modified: qpid/dispatch/trunk/src/waypoint_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint_private.h?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint_private.h (original)
+++ qpid/dispatch/trunk/src/waypoint_private.h Thu May 15 16:27:08 2014
@@ -25,8 +25,23 @@
/**
* @file
- * A waypoint is a point on a multi-phase route where messages can exit and re-enter the router.
- * For example after being sent through an external broker's queue.
+ *
+ * A waypoint sends/receives messages to/from an external entity such as a
+ * broker as part of a multi-phase address.
+ *
+ * An address can have multiple phases. Each phase acts like a separate address,
+ * but sharing the same address string.
+ *
+ * Phases are not visible to normal senders/receivers, they are set by
+ * waypoints. Messages from normal senders go to the phase=0 address. Normal
+ * subscribers subscribe to the highest phase defined for the address.
+ *
+ * A waypoint takes messages for its in-phase and sends them to the external
+ * entity. Messages received from the external entity are given the waypoint's
+ * out-phase. Waypoints can be "chained" with the out-phase of one equal to the
+ * in-phase for the next. Thus waypoints provide a way to route messages via
+ * multiple external entities between a sender and a subscriber using the same
+ * address.
*/
void qd_waypoint_activate_all(qd_dispatch_t *qd);
Modified: qpid/dispatch/trunk/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/CMakeLists.txt?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/tests/CMakeLists.txt Thu May 15 16:27:08 2014
@@ -53,7 +53,7 @@ add_test(unit_tests_size_1 unit_test
add_test(unit_tests unit_tests ${CMAKE_CURRENT_SOURCE_DIR}/threads4.conf)
add_test(router_tests python ${CMAKE_CURRENT_SOURCE_DIR}/router_engine_test.py -v)
-set(SYSTEM_TEST_FILES system_tests_one_router.py system_tests_two_routers.py)
+set(SYSTEM_TEST_FILES system_test.py system_tests_one_router.py system_tests_two_routers.py system_tests_broker.py)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config_build.sh.in ${CMAKE_CURRENT_BINARY_DIR}/config_build.sh)
Modified: qpid/dispatch/trunk/tests/system_test.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Thu May 15 16:27:08 2014
@@ -26,9 +26,7 @@ Features:
- Tools to manipulate qpidd and qdrouter configuration files.
- Sundry other tools.
-FIXME aconway 2014-03-27: we need to check what is installed & skip tests that can't be run.
-
-Current we assume the following are installed:
+Requires the following:
- proton with python bindings
- qpidd with AMQP 1.0 support
- qpidtoollibs python module from qpid/tools
@@ -40,12 +38,12 @@ You can set this up from packages on fed
Here's how to build from source assuming you use default install prefix /usr/local
+With a qpid-proton checkout at $PROTON
+ cd $PROTON/<build-directory>; make install
With a qpid checkout at $QPID:
cd $QPID/qpid/cpp/<build-directory>; make install
cd $QPID/qpid/tools; ./setup.py install --prefix /usr/local
cd $QPID/qpid/python; ./setup.py install --prefix /usr/local
-With a qpid-proton checkout at $PROTON
- cd $PROTON/<build-directory>; make install
And finally make sure to set up your environment:
@@ -54,27 +52,70 @@ export PYTHONPATH="$PYTHONPATH:/usr/loca
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib64"
"""
-import sys, os, time, socket, random
-import subprocess, tempfile, shutil
-import unittest
-import qpidtoollibs
-import qpid_messaging as qm
-import proton
-from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
+import os, time, socket, random, subprocess, shutil, unittest
from copy import copy
+import proton
+from proton import Message
+
+# Optional modules
+MISSING_MODULES = []
+
+try:
+ import qpidtoollibs
+except ImportError, err:
+ qpidtoollibs = None # pylint: disable=invalid-name
+ MISSING_MODULES.append(str(err))
+
+try:
+ import qpid_messaging as qm
+except ImportError, err:
+ qm = None # pylint: disable=invalid-name
+ MISSING_MODULES.append(str(err))
+
+def find_exe(program):
+ """Find an executable in the system PATH"""
+ def is_exe(fpath):
+ """True if fpath is executable"""
+ return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
+ mydir = os.path.split(program)[0]
+ if mydir:
+ if is_exe(program):
+ return program
+ else:
+ for path in os.environ["PATH"].split(os.pathsep):
+ exe_file = os.path.join(path, program)
+ if is_exe(exe_file):
+ return exe_file
+ return None
+
+
+def _check_requirements():
+ """If requirements are missing, return a message, else return empty string."""
+ missing = MISSING_MODULES
+ required_exes = ['qpidd', 'qdrouterd']
+ missing += ["No exectuable %s"%e for e in required_exes if not find_exe(e)]
+ if find_exe('qpidd'):
+ p = subprocess.Popen(['qpidd', '--help'], stdout=subprocess.PIPE)
+ if not "AMQP 1.0" in p.communicate()[0]:
+ missing.append("No AMQP 1.0 support in qpidd")
+ if missing:
+ return "%s: %s"%(__name__, ", ".join(missing))
+
+MISSING_REQUIREMENTS = _check_requirements()
-def retry_delay(deadline, timeout, delay, max_delay):
+def retry_delay(deadline, delay, max_delay):
"""For internal use in retry. Sleep as required
and return the new delay or None if retry should time out"""
remaining = deadline - time.time()
- if remaining <= 0: return None
+ if remaining <= 0:
+ return None
time.sleep(min(delay, remaining))
return min(delay*2, max_delay)
-default_timeout=float(os.environ.get("QPID_SYSTEM_TEST_TIMEOUT", 5))
+DEFAULT_TIMEOUT = float(os.environ.get("QPID_SYSTEM_TEST_TIMEOUT", 5))
-def retry(function, timeout=default_timeout, delay=.001, max_delay=1):
+def retry(function, timeout=DEFAULT_TIMEOUT, delay=.001, max_delay=1):
"""Call function until it returns a true value or timeout expires.
Double the delay for each retry up to max_delay.
Returns what function returns or None if timeout expires.
@@ -85,10 +126,11 @@ def retry(function, timeout=default_time
if ret:
return ret
else:
- delay = retry_delay(deadline, timeout, delay, max_delay)
- if delay is None: return None
+ delay = retry_delay(deadline, delay, max_delay)
+ if delay is None:
+ return None
-def retry_exception(function, timeout=default_timeout, delay=.001, max_delay=1, exception_test=None):
+def retry_exception(function, timeout=DEFAULT_TIMEOUT, delay=.001, max_delay=1, exception_test=None):
"""Call function until it returns without exception or timeout expires.
Double the delay for each retry up to max_delay.
Calls exception_test with any exception raised by function, exception_test
@@ -100,10 +142,12 @@ def retry_exception(function, timeout=de
while True:
try:
return function()
- except Exception, e:
- if exception_test: exception_test(e)
- delay = retry_delay(deadline, timeout, delay, max_delay)
- if delay is None: raise
+ except Exception, e: # pylint: disable=broad-except
+ if exception_test:
+ exception_test(e)
+ delay = retry_delay(deadline, delay, max_delay)
+ if delay is None:
+ raise
def port_available(port, host='0.0.0.0'):
"""Return true if connecting to host:port gives 'connection refused'."""
@@ -113,17 +157,21 @@ def port_available(port, host='0.0.0.0')
s.close()
except socket.error, e:
return e.errno == 111
- except: pass
+ except:
+ pass
return False
def wait_port(port, host='0.0.0.0', **retry_kwargs):
"""Wait up to timeout for port (on host) to be connectable.
Takes same keyword arguments as retry to control the timeout"""
- def check(e): # Only retry on connection refused
- if not isinstance(e, socket.error) or not e.errno == 111: raise
+ def check(e):
+ """Only retry on connection refused"""
+ if not isinstance(e, socket.error) or not e.errno == 111:
+ raise
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try: retry_exception(lambda: s.connect((host, port)), exception_test=check,
- **retry_kwargs)
+ try:
+ retry_exception(lambda: s.connect((host, port)), exception_test=check,
+ **retry_kwargs)
except Exception, e:
raise Exception("wait_port timeout on %s:%s: %s"%(host, port, e))
@@ -132,21 +180,23 @@ def wait_port(port, host='0.0.0.0', **re
def wait_ports(ports, host="127.0.0.1", **retry_kwargs):
"""Wait up to timeout for all ports (on host) to be connectable.
Takes same keyword arguments as retry to control the timeout"""
- for p in ports: wait_port(p)
+ for p in ports:
+ wait_port(p, host=host, **retry_kwargs)
def message(**properties):
"""Convenience to create a proton.Message with properties set"""
m = Message()
- for name, value in properties.iteritems(): setattr(m, name, value)
+ for name, value in properties.iteritems():
+ setattr(m, name, value)
return m
class Process(subprocess.Popen):
"""Popen that can be torn down at the end of a TestCase and stores its output."""
# Expected states of a Process at teardown
- RUNNING=1 # Still running
- EXIT_OK=2 # Exit status 0
- EXIT_FAIL=3 # Exit status not 0
+ RUNNING = 1 # Still running
+ EXIT_OK = 2 # Exit status 0
+ EXIT_FAIL = 3 # Exit status not 0
def __init__(self, name, args, expect=EXIT_OK, **kwargs):
self.name, self.args, self.expect = name, args, expect
@@ -155,24 +205,36 @@ class Process(subprocess.Popen):
super(Process, self).__init__(
args, stdout=self.out, stderr=subprocess.STDOUT, **kwargs)
- def assert_running(self): assert self.poll() is None, "%s exited"%name
+ def assert_running(self):
+ """Assert that the proces is still running"""
+ assert self.poll() is None, "%s exited" % self.name
def teardown(self):
- if self.torndown: return
+ """Check process status and stop the process if necessary"""
+ if self.torndown:
+ return
self.torndown = True
status = self.poll()
- if status is None: self.kill()
+ if status is None:
+ self.kill()
self.out.close()
self.check_exit(status)
def check_exit(self, status):
+ """Check process exit status"""
def check(condition, expect):
- if status is None: actual="still running"
- else: actual="exit %s"%status
+ """assert condition with a suitable message for status"""
+ if status is None:
+ actual = "still running"
+ else:
+ actual = "exit %s"%status
assert condition, "Expected %s but %s: %s"%(expect, actual, self.name)
- if self.expect == Process.RUNNING: check(status is None, "still running"),
- elif self.expect == Process.EXIT_OK: check(status == 0, "exit 0"),
- elif self.expect == Process.EXIT_FAIL: check(status != 0, "exit non-0")
+ if self.expect == Process.RUNNING:
+ check(status is None, "still running")
+ elif self.expect == Process.EXIT_OK:
+ check(status == 0, "exit 0")
+ elif self.expect == Process.EXIT_FAIL:
+ check(status != 0, "exit non-0")
class Config(object):
"""Base class for configuration objects that provide a convenient
@@ -181,7 +243,8 @@ class Config(object):
def write(self, name, suffix=".conf"):
"""Write the config object to file name.suffix. Returns name.suffix."""
name = name+suffix
- with open(name,'w') as f: f.write(str(self))
+ with open(name, 'w') as f:
+ f.write(str(self))
return name
@@ -189,30 +252,32 @@ class Qdrouterd(Process):
"""Run a Qpid Dispatch Router Daemon"""
class Config(list, Config):
- """List of ('section', {'name':'value',...}).
+ """List of ('section', {'name':'value', ...}).
Fills in some default values automatically, see Qdrouterd.DEFAULTS
"""
DEFAULTS = {
'listener':{'sasl-mechanisms':'ANONYMOUS'},
- 'connector':{'sasl-mechanisms':'ANONYMOUS','role':'on-demand'}
+ 'connector':{'sasl-mechanisms':'ANONYMOUS', 'role':'on-demand'}
}
def sections(self, name):
"""Return list of sections named name"""
- return [p for n,p in self if n == name]
-
- def _defs(self, name, props):
- """Fill in defaults for required values"""
- if not name in Qdrouterd.Config.DEFAULTS: return props
- p = copy(Qdrouterd.Config.DEFAULTS[name])
- p.update(props);
- return p
+ return [p for n, p in self if n == name]
def __str__(self):
"""Generate config file content. Fills in defaults for some require values"""
- def props(p): return "".join([" %s: %s\n"%(k,v) for k,v in p.iteritems()])
- return "".join(["%s {\n%s}\n"%(n,props(self._defs(n,p))) for n,p in self])
+ def defs(name, props):
+ """Fill in defaults for required values"""
+ if not name in Qdrouterd.Config.DEFAULTS:
+ return props
+ p = copy(Qdrouterd.Config.DEFAULTS[name])
+ p.update(props)
+ return p
+ def props(p):
+ """qpidd.conf format of dict p"""
+ return "".join([" %s: %s\n"%(k, v) for k, v in p.iteritems()])
+ return "".join(["%s {\n%s}\n"%(n, props(defs(n, p))) for n, p in self])
class Agent(object):
"""Management agent"""
@@ -224,19 +289,19 @@ class Qdrouterd(Process):
self.subscription = self.messenger.subscribe("amqp:/#")
self.reply_to = self.subscription.address
- def stop(self): self.messenger.stop()
-
- def get(self, type):
- """Return a list of attribute dicts for each instance of type"""
- request = message(address=self.address, reply_to=self.reply_to,
- correlation_id=1,
- properties={u'operation':u'QUERY', u'entityType':type},
- body={'attributeNames':[]})
- response = Message()
+ def stop(self):
+ """Stop the agent's messenger"""
+ self.messenger.stop()
+
+ def get(self, entity_type):
+ """Return a list of attribute dicts for each instance of entity_type"""
+ request = message(
+ address=self.address, reply_to=self.reply_to,
+ correlation_id=1,
+ properties={u'operation':u'QUERY', u'entityType':entity_type},
+ body={'attributeNames':[]})
self.messenger.put(request)
- self.messenger.send()
- self.messenger.recv(1)
- self.messenger.get(response)
+ response = self.messenger.fetch()
if response.properties['statusCode'] != 200:
raise Exception("Agent error: %d %s" % (
response.properties['statusCode'],
@@ -245,7 +310,7 @@ class Qdrouterd(Process):
return [dict(zip(attrs, values)) for values in response.body['results']]
- def __init__(self, name, config, **kwargs):
+ def __init__(self, name, config=Config()):
self.config = copy(config)
super(Qdrouterd, self).__init__(
name, ['qdrouterd', '-c', config.write(name)], expect=Process.RUNNING)
@@ -253,29 +318,33 @@ class Qdrouterd(Process):
@property
def agent(self):
- if not self._agent: self._agent = self.Agent(self)
+ """Return an management Agent for this router"""
+ if not self._agent:
+ self._agent = self.Agent(self)
return self._agent
def teardown(self):
- if self._agent: self._agent.stop()
+ if self._agent:
+ self._agent.stop()
super(Qdrouterd, self).teardown()
@property
def ports(self):
"""Return list of configured ports for all listeners"""
- return [ l['port'] for l in self.config.sections('listener') ]
+ return [l['port'] for l in self.config.sections('listener')]
@property
def addresses(self):
"""Return amqp://host:port addresses for all listeners"""
- return [ "amqp://%s:%s"%(l['addr'],l['port']) for l in self.config.sections('listener') ]
+ return ["amqp://%s:%s"%(l['addr'], l['port']) for l in self.config.sections('listener')]
def is_connected(self, port, host='0.0.0.0'):
"""If router has a connection to host:port return the management info.
Otherwise return None"""
connections = self.agent.get('org.apache.qpid.dispatch.connection')
for c in connections:
- if c['name'] == '%s:%s'%(host, port): return c
+ if c['name'] == '%s:%s'%(host, port):
+ return c
return None
@@ -283,17 +352,18 @@ class Qpidd(Process):
"""Run a Qpid Daemon"""
class Config(dict, Config):
-
+ """qpidd.conf contents. Use like a dict, str() generates qpidd.conf format"""
def __str__(self):
- return "".join(["%s=%s\n"%(k,v) for k,v in self.iteritems()])
-
+ return "".join(["%s=%s\n"%(k, v) for k, v in self.iteritems()])
- def __init__(self, name, config):
+ def __init__(self, name, config=Config(), port=None):
self.config = Qpidd.Config(
{'auth':'no',
'log-to-stderr':'false', 'log-to-file':name+".log",
'data-dir':name+".data"})
self.config.update(config)
+ if port:
+ self.config['port'] = port
super(Qpidd, self).__init__(
name, ['qpidd', '--config', self.config.write(name)], expect=Process.RUNNING)
self.port = self.config['port'] or 5672
@@ -302,71 +372,100 @@ class Qpidd(Process):
def qm_connect(self):
"""Make a qpid_messaging connection to the broker"""
+ if not qm:
+ raise Exception("No qpid_messaging module available")
return qm.Connection.establish(self.address)
@property
def agent(self, **kwargs):
- if not self._agent: self._agent = qpidtoollibs.BrokerAgent(self.qm_connect())
+ """Get the management agent for this broker"""
+ if not qpidtoollibs:
+ raise Exception("No qpidtoollibs module available")
+ if not self._agent:
+ self._agent = qpidtoollibs.BrokerAgent(self.qm_connect(), **kwargs)
return self._agent
-class Messenger(proton.Messenger):
- """Minor additions to Messenger for tests"""
+# Decorator to add an optional flush argument to a method, defaulting to
+# the _flush value for the messenger.
+def flush_arg(method):
+ """Decorator for Messenger methods that adds an optional flush argument,
+ defaulting to the Messenger default"""
+ def wrapper(self, *args, **kwargs):
+ """Wrapper that adds flush argument"""
+ flush = self._flush # pylint: disable=protected-access
+ if 'flush' in kwargs:
+ flush = kwargs['flush']
+ del kwargs['flush']
+ r = method(self, *args, **kwargs)
+ if flush:
+ self.flush()
+ return r
+ return wrapper
+
+class Messenger(proton.Messenger): # pylint: disable=too-many-public-methods
+ """Convenience additions to proton.Messenger"""
+
+ def __init__(self, name=None, timeout=DEFAULT_TIMEOUT, blocking=True, flush=False):
+ super(Messenger, self).__init__(name)
+ self.timeout = timeout
+ self.blocking = blocking
+ self._flush = flush
def flush(self):
"""Call work() till there is no work left."""
- while self.work(0.01): pass
+ while self.work(0.01):
+ pass
- def subscribe(self, source):
- """proton.Messenger.subscribe and work till subscription is visible."""
- t = proton.Messenger.subscribe(self, source)
- self.flush()
- return t
-
-class TestCase(unittest.TestCase):
- """A test case that creates a separate directory for each test and
- cleans up during teardown."""
+ @flush_arg
+ def fetch(self, accept=True):
+ """Fetch a single message"""
+ msg = Message()
+ self.recv(1)
+ self.get(msg)
+ if accept:
+ self.accept()
+ return msg
+
+ put = flush_arg(proton.Messenger.put)
+ subscribe = flush_arg(proton.Messenger.subscribe)
+
+class Tester(object):
+ """Tools for use by TestCase
+- Create a directory for the test.
+- Utilities to create processes and servers, manage ports etc.
+- Clean up processes on teardown"""
def __init__(self, *args, **kwargs):
- super(TestCase, self).__init__(*args, **kwargs)
- self.save_dir = os.getcwd()
- # self.id() is normally _module[.module].TestClass.test_name
- id = self.id().split(".")
- if len(id) == 1: # Not the expected format, just use dir = id.
- dir = id[0]
- else: # use dir = module[.module].TestClass/test_name
- dir = os.path.join(".".join(id[0:-1]), id[-1])
- shutil.rmtree(dir, ignore_errors=True)
- os.makedirs(dir)
- os.chdir(dir)
self.cleanup_list = []
- self.port_range = (20000, 30000)
- self.next_port = random.randint(*self.port_range)
+ self.save_dir = None
+ self.directory = None
- def tearDown(self):
- os.chdir(self.save_dir)
+ def setup(self, directory):
+ """Create directory"""
+ self.directory = directory
+ shutil.rmtree(directory, ignore_errors=True)
+ os.makedirs(directory)
+ self.save_dir = os.getcwd()
+ os.chdir(directory)
+
+ def teardown(self):
+ """Clean up (tear-down, stop or close) objects recorded via cleanup()"""
self.cleanup_list.reverse()
for t in self.cleanup_list:
for m in ["teardown", "tearDown", "stop", "close"]:
a = getattr(t, m, None)
- if a: a(); break
-
- def cleanup(self, x): self.cleanup_list.append(x); return x
+ if a:
+ a()
+ break
+ os.chdir(self.save_dir)
- def get_port(self):
- """Get an unused port"""
- def advance(): # Advance with wrap-around
- self.next_port += 1
- if self.next_port >= self.port_range[1]: self.next_port = port_range[0]
- start = self.next_port
- while not port_available(self.next_port):
- advance()
- if self.next_port == start:
- raise Exception("No avaliable ports in range %s", self.port_range)
- p = self.next_port;
- advance()
- return p
+ def cleanup(self, x):
+ """Record object x for clean-up during tear-down.
+ x should have on of the methods teardown, tearDown, stop or close"""
+ self.cleanup_list.append(x)
+ return x
def popen(self, *args, **kwargs):
"""Start a Process that will be cleaned up on teardown"""
@@ -380,16 +479,78 @@ class TestCase(unittest.TestCase):
"""Return a Qpidd that will be cleaned up on teardown"""
return self.cleanup(Qpidd(*args, **kwargs))
- def messenger(self, name="test-messenger", timeout=default_timeout, blocking=True, cleanup=True):
+ def messenger(self, name=None, cleanup=True, **kwargs):
"""Return a started Messenger that will be cleaned up on teardown."""
- m = Messenger(name)
- m.timeout = timeout
- m.blocking = blocking
+ m = Messenger(name or os.path.basename(self.directory), **kwargs)
m.start()
- if cleanup: self.cleanup(m)
+ if cleanup:
+ self.cleanup(m)
return m
- def message(self, **properties):
- """Convenience to create a proton.Message with properties set"""
- global message
- return message(**properties)
+ port_range = (20000, 30000)
+ next_port = random.randint(port_range[0], port_range[1])
+
+ @classmethod
+ def get_port(cls):
+ """Get an unused port"""
+ def advance():
+ """Advance with wrap-around"""
+ cls.next_port += 1
+ if cls.next_port >= cls.port_range[1]:
+ cls.next_port = cls.port_range[0]
+ start = cls.next_port
+ while not port_available(cls.next_port):
+ advance()
+ if cls.next_port == start:
+ raise Exception("No avaliable ports in range %s", cls.port_range)
+ p = cls.next_port
+ advance()
+ return p
+
+
+class TestCase(unittest.TestCase, Tester): # pylint: disable=too-many-public-methods
+ """A TestCase that sets up its own working directory and is also a Tester."""
+
+ def __init__(self, test_method):
+ unittest.TestCase.__init__(self, test_method)
+ Tester.__init__(self)
+
+ @classmethod
+ def base_dir(cls):
+ return os.path.abspath(os.path.join(__name__, cls.__name__))
+
+ @classmethod
+ def setUpClass(cls):
+ shutil.rmtree(cls.base_dir(), ignore_errors=True) # Clear old test tree.
+ cls.tester = Tester()
+ cls.tester.setup(os.path.join(cls.base_dir(), 'setup_class'))
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.tester.teardown()
+
+ def setUp(self):
+ # self.id() is normally the fully qualified method name
+ Tester.setup(self, os.path.join(self.base_dir(), self.id().split(".")[-1]))
+
+ def tearDown(self):
+ Tester.teardown(self)
+
+ def skipTest(self, reason):
+ """Workaround missing unittest.TestCase.skipTest in python 2.6.
+ The caller must return in order to end the test"""
+ if hasattr(unittest.TestCase, 'skipTest'):
+ self.skipTest(reason)
+ else:
+ print "Skipping test", id(), reason
+
+ # Hack to support setUpClass/tearDownClass on older versions of python.
+ # The default TestLoader sorts tests alphabetically so we insert
+ # fake tests that will run first and last to call the class setup/teardown functions.
+ if not hasattr(unittest.TestCase, 'setUpClass'):
+ def test_0000_setup_class(self):
+ """Fake test to call setUpClass"""
+ self.__class__.setUpClass()
+ def test_zzzz_teardown_class(self):
+ """Fake test to call tearDownClass"""
+ self.__class__.tearDownClass()
Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Thu May 15 16:27:08 2014
@@ -17,70 +17,94 @@
# under the License.
#
-"""System tests involving one or more brokers and dispatch routers
-
-FIXME aconway 2014-04-29:
-
-These tests is a work in progress, they do not pass
-and they are not run by the qdtest script.
-
-They are provided as an example of how to use the system_test module.
-
-To run the tests from a dispatch checkout:
- . config.sh; python tests/system_tests_broker.py
-Note the tests wil
"""
+System tests involving one or more brokers and dispatch routers integrated
+with waypoints.
+"""
+import unittest, system_test
+from system_test import wait_port, wait_ports, Qdrouterd, retry, message, MISSING_REQUIREMENTS
-from system_test import *
-
-class BrokerSystemTest(TestCase):
-
- def test_broker(self):
- testq = 'testq'
+class BrokerSystemTest(system_test.TestCase): # pylint: disable=too-many-public-methods
+ """System tests involving routers and qpidd brokers"""
- # Start two qpidd brokers called qpidd0 and qpidd1
- qpidd = [
- self.qpidd('qpidd%s'%i,
- Qpidd.Config({'port':self.get_port(), 'trace':1}))
- for i in xrange(2) ]
-
- # FIXME aconway 2014-05-13: router waypoint connection seems fragile
- # unless everything is set up beforehand.
- wait_ports([q.port for q in qpidd])
- qpidd[0].agent.addQueue(testq)
+ # Hack for python 2.6 which does not support setupClass.
+ # We set setup_ok = true in setupClass, and skip all tests if it's not true.
+ setup_ok = False
+
+ @classmethod
+ def setUpClass(cls):
+ """Start 3 qpidd brokers, wait for them to be ready."""
+ super(BrokerSystemTest, cls).setUpClass()
+ cls.qpidd = [cls.tester.qpidd('qpidd%s'%i, port=cls.get_port())
+ for i in xrange(3)]
+ for q in cls.qpidd:
+ wait_port(q.port)
+ cls.setup_ok = True
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.setup_ok:
+ cls.setup_ok = False
+ super(BrokerSystemTest, cls).tearDownClass()
+
+ def test_distrbuted_queue(self):
+ """Static distributed queue, one router, three brokers"""
+ if not self.setup_ok:
+ return self.skipTest("setUpClass failed")
+ testq = self.id() # The distributed queue name
+ for q in self.qpidd:
+ q.agent.addQueue(testq)
# Start a qdrouterd
+ # We have a waypoint for each broker, on the same testq address.
+ # Sending to testq should spread messages to the qpidd queues.
+ # Subscribing to testq should gather messages from the qpidd queues.
router_conf = Qdrouterd.Config([
- ('log', { 'module':'DEFAULT', 'level':'NOTICE' }),
- ('log', { 'module':'ROUTER', 'level':'TRACE' }),
- ('log', { 'module':'MESSAGE', 'level':'TRACE' }),
+ ('log', {'module':'DEFAULT', 'level':'NOTICE'}),
+ ('log', {'module':'ROUTER', 'level':'TRACE'}),
+ ('log', {'module':'MESSAGE', 'level':'TRACE'}),
('container', {'container-name':self.id()}),
('container', {'container-name':self.id()}),
- ('router', { 'mode': 'standalone', 'router-id': self.id() }),
+ ('router', {'mode': 'standalone', 'router-id': self.id()}),
('listener', {'addr':'0.0.0.0', 'port':self.get_port()}),
- ('connector', {'name':'qpidd0', 'addr':'0.0.0.0', 'port':qpidd[0].port}),
- ('connector', {'name':'qpidd1', 'addr':'0.0.0.0', 'port':qpidd[1].port}),
- ('fixed-address', {'prefix':'testq', 'phase':0, 'fanout':'single', 'bias':'closest'}),
- ('fixed-address', {'prefix':'testq', 'phase':1, 'fanout':'single', 'bias':'closest'}),
- ('waypoint', {'name':'testq', 'out-phase':1, 'in-phase':0, 'connector':'qpidd0'})
+ ('fixed-address', {'prefix':testq, 'phase':0, 'fanout':'single', 'bias':'spread'}),
+ ('fixed-address', {'prefix':testq, 'phase':1, 'fanout':'single', 'bias':'spread'})
])
+ # Add connector and waypoint for each broker.
+ for q in self.qpidd:
+ router_conf += [
+ ('connector', {'name':q.name, 'addr':'0.0.0.0', 'port':q.port}),
+ ('waypoint', {'name':testq, 'out-phase':1, 'in-phase':0, 'connector':q.name})]
+
router = self.qdrouterd('router0', router_conf)
wait_ports(router.ports)
- retry(lambda: router.is_connected(qpidd[0].port))
+ for q in self.qpidd:
+ retry(lambda: router.is_connected(q.port))
- # Test for waypoint routing via queue
- m=self.message(address=router.addresses[0]+"/"+testq, body="FOO")
msgr = self.messenger()
- msgr.subscribe(m.address)
- msgr.put(m)
- msgr.send()
- msg = Message()
- msgr.recv(1)
- msgr.get(msg)
- msgr.accept()
- msgr.flush()
- self.assertEqual(msg.body, m.body)
- aq = qpidd[0].agent.getQueue(testq)
- self.assertEquals((aq.msgTotalEnqueues, aq.msgTotalDequeues), (1,1))
-if __name__ == '__main__': unittest.main()
+ address = router.addresses[0]+"/"+testq
+ msgr.subscribe(address, flush=True)
+ n = 20 # Messages per broker
+ r = range(n*len(self.qpidd))
+ for i in r:
+ msgr.put(message(address=address, body=i))
+ messages = sorted(msgr.fetch().body for i in r)
+ msgr.flush()
+ self.assertEqual(messages, r)
+ # Verify we got back exactly what we sent.
+ qs = [q.agent.getQueue(testq) for q in self.qpidd]
+ enq = sum(q.msgTotalEnqueues for q in qs)
+ deq = sum(q.msgTotalDequeues for q in qs)
+ self.assertEquals((enq, deq), (len(r), len(r)))
+ # Verify the messages were spread equally over the brokers.
+ self.assertEquals(
+ [(q.msgTotalEnqueues, q.msgTotalDequeues) for q in qs],
+ [(n, n) for q in qs]
+ )
+
+if __name__ == '__main__':
+ if MISSING_REQUIREMENTS:
+ print MISSING_REQUIREMENTS
+ else:
+ unittest.main()
Modified: qpid/dispatch/trunk/tools/qdtest.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdtest.in?rev=1594967&r1=1594966&r2=1594967&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdtest.in (original)
+++ qpid/dispatch/trunk/tools/qdtest.in Thu May 15 16:27:08 2014
@@ -33,5 +33,8 @@ python $QPID_DISPATCH_HOME/tests/system_
echo "Running system_tests_two_routers.py with SSL"
python $QPID_DISPATCH_HOME/tests/system_tests_two_routers.py -v --ssl
+echo "Running system_tests_broker.py"
+python $QPID_DISPATCH_HOME/tests/system_tests_broker.py -v
+
echo "Running qdstat_test.sh"
bash $QPID_DISPATCH_HOME/tests/qdstat_test.sh
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org