You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/05/14 16:45:54 UTC
svn commit: r1594588 - in /qpid/dispatch/trunk: src/waypoint.c
tests/config_build.sh.in tests/system_test.py tests/system_tests_broker.py
Author: aconway
Date: Wed May 14 14:45:54 2014
New Revision: 1594588
URL: http://svn.apache.org/r1594588
Log:
QPID-DISPATCH-52: Automated system test of dispatch sending messages through a broker queue.
Using waypoints, the test sends a message into dispatch, out to a broker queue,
back to dispatch fromt the queue and delivers to a dispatch subscriber.
Gotchas:
- broker and broker queue must be set up before starting dispatch.
- must wait for dispatch to connect to the broker before sending message.
Improvements:
- system_tests.TestCase.get_port: check if port is used before returning it.
- Qdrouter.is_connected: test if router has a connection using management info.
- additional logging in waypoint.c
- fixed test/config_build.sh env to find qdstat
Modified:
qpid/dispatch/trunk/src/waypoint.c
qpid/dispatch/trunk/tests/config_build.sh.in
qpid/dispatch/trunk/tests/system_test.py
qpid/dispatch/trunk/tests/system_tests_broker.py
Modified: qpid/dispatch/trunk/src/waypoint.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint.c?rev=1594588&r1=1594587&r2=1594588&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint.c (original)
+++ qpid/dispatch/trunk/src/waypoint.c Wed May 14 14:45:54 2014
@@ -40,6 +40,8 @@ struct qd_waypoint_context_t {
qd_waypoint_ref_list_t refs;
};
+// Convenience for logging waypoint messages, expects qd and wp to be defined.
+#define LOG(LEVEL, MSG, ...) qd_log(qd->router->log_source, QD_LOG_##LEVEL, "Waypoint '%s': " MSG, wp->name, ##__VA_ARGS__)
static void qd_waypoint_visit_sink_LH(qd_dispatch_t *qd, qd_waypoint_t *wp)
{
@@ -47,11 +49,13 @@ static void qd_waypoint_visit_sink_LH(qd
qd_address_t *addr = wp->in_address;
char unused;
+ LOG(TRACE, "Visit sink");
//
// If the waypoint has no in-address, look it up in the hash table or create
// a new one and put it in the hash table.
//
if (!addr) {
+ LOG(TRACE, "Create sink address, in-phase %c", wp->in_phase);
//
// Compose the phased-address and search the routing table for the address.
// If it's not found, add it to the table but leave the link/router linkages empty.
@@ -76,8 +80,10 @@ static void qd_waypoint_visit_sink_LH(qd
qd_field_iterator_free(iter);
}
- if (!wp->connected)
+ if (!wp->connected) {
+ LOG(TRACE, "Start on-demand sink connector");
qd_connection_manager_start_on_demand(qd, wp->connector);
+ }
else if (!wp->out_link) {
wp->out_link = qd_link(router->node, wp->connection, QD_OUTGOING, wp->name);
pn_terminus_set_address(qd_link_target(wp->out_link), wp->name);
@@ -109,7 +115,7 @@ static void qd_waypoint_visit_sink_LH(qd
pn_link_open(qd_link_pn(wp->out_link));
qd_link_activate(wp->out_link);
- qd_log(router->log_source, QD_LOG_DEBUG, "Added outgoing link for waypoint: %s", wp->name);
+ LOG(TRACE, "Create sink out-link '%s'", pn_link_name(qd_link_pn(wp->out_link)));
}
}
@@ -120,11 +126,13 @@ static void qd_waypoint_visit_source_LH(
qd_address_t *addr = wp->out_address;
char unused;
+ LOG(TRACE, "Visit source");
//
// If the waypoint has no out-address, look it up in the hash table or create
// a new one and put it in the hash table.
//
if (!addr) {
+ LOG(TRACE, "Create source address, out-phase %c", wp->out_phase);
//
// Compose the phased-address and search the routing table for the address.
// If it's not found, add it to the table but leave the link/router linkages empty.
@@ -149,8 +157,10 @@ static void qd_waypoint_visit_source_LH(
qd_field_iterator_free(iter);
}
- if (!wp->connected)
+ if (!wp->connected) {
+ LOG(TRACE, "Start source on-demand connector");
qd_connection_manager_start_on_demand(qd, wp->connector);
+ }
else if (!wp->in_link) {
wp->in_link = qd_link(router->node, wp->connection, QD_INCOMING, wp->name);
pn_terminus_set_address(qd_link_source(wp->in_link), wp->name);
@@ -174,10 +184,9 @@ static void qd_waypoint_visit_source_LH(
pn_link_open(qd_link_pn(wp->in_link));
qd_link_activate(wp->in_link);
- qd_log(router->log_source, QD_LOG_DEBUG, "Added incoming link for waypoint: %s", wp->name);
+ LOG(TRACE, "Create source in-link '%s'", pn_link_name(qd_link_pn(wp->in_link)));
}
-
- if (DEQ_SIZE(addr->rlinks) + DEQ_SIZE(addr->rnodes) > 0) {
+ if (wp->in_link && (DEQ_SIZE(addr->rlinks) + DEQ_SIZE(addr->rnodes) > 0)) {
//
// CASE: This address has reachable destinations in the network.
// If there is no inbound link from the waypoint source,
@@ -185,7 +194,7 @@ static void qd_waypoint_visit_source_LH(
//
pn_link_flow(qd_link_pn(wp->in_link), 1);
qd_link_activate(wp->in_link);
- qd_log(router->log_source, QD_LOG_DEBUG, "Added credit for incoming link for waypoint: %s", wp->name);
+ LOG(DEBUG, "Added credit for incoming link '%s'", pn_link_name(qd_link_pn(wp->in_link)));
} else {
//
// CASE: This address has no reachable destinations in the network.
@@ -248,9 +257,7 @@ void qd_waypoint_activate_all(qd_dispatc
if (!wp->connector) {
wp->connector = qd_connection_manager_find_on_demand(qd, wp->connector_name);
if (!wp->connector) {
- qd_log(qd->router->log_source, QD_LOG_ERROR,
- "In waypoint '%s', on-demand connector '%s' not found",
- wp->name, wp->connector_name);
+ LOG(ERROR, "On-demand connector '%s' not found", wp->connector_name);
continue;
}
@@ -281,7 +288,7 @@ void qd_waypoint_connection_opened(qd_di
{
qd_waypoint_context_t *context = (qd_waypoint_context_t*) qd_config_connector_context(cc);
- qd_log(qd->router->log_source, QD_LOG_INFO, "On-demand connector opened: %s",
+ qd_log(qd->router->log_source, QD_LOG_INFO, "On-demand connector '%s' opened",
qd_config_connector_name(cc));
sys_mutex_lock(qd->router->lock);
@@ -314,6 +321,7 @@ void qd_waypoint_link_closed(qd_dispatch
void qd_waypoint_address_updated_LH(qd_dispatch_t *qd, qd_address_t *addr)
{
qd_waypoint_t *wp = DEQ_HEAD(qd->router->waypoints);
+ LOG(TRACE, "Address updated");
while (wp) {
if (wp->out_address == addr)
qd_waypoint_visit_LH(qd, wp);
Modified: qpid/dispatch/trunk/tests/config_build.sh.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config_build.sh.in?rev=1594588&r1=1594587&r2=1594588&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config_build.sh.in (original)
+++ qpid/dispatch/trunk/tests/config_build.sh.in Wed May 14 14:45:54 2014
@@ -22,4 +22,4 @@ export SOURCE_DIR=${CMAKE_SOURCE_DIR}
export BUILD_DIR=${CMAKE_BINARY_DIR}
export PYTHONPATH=$SOURCE_DIR/python:$SOURCE_DIR/tests:$PYTHONPATH
-export PATH=$BUILD_DIR:$BUILD_DIR/router:$SOURCE_DIR/bin:$PATH
+export PATH=$BUILD_DIR:$BUILD_DIR/router:$BUILD_DIR/tools:$SOURCE_DIR/bin:$PATH
Modified: qpid/dispatch/trunk/tests/system_test.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1594588&r1=1594587&r2=1594588&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Wed May 14 14:45:54 2014
@@ -71,7 +71,10 @@ def retry_delay(deadline, timeout, delay
time.sleep(min(delay, remaining))
return min(delay*2, max_delay)
-def retry(function, timeout=10, delay=.001, max_delay=1):
+
+default_timeout=float(os.environ.get("QPID_SYSTEM_TEST_TIMEOUT", 5))
+
+def retry(function, timeout=default_timeout, delay=.001, max_delay=1):
"""Call function until it returns a true value or timeout expires.
Double the delay for each retry up to max_delay.
Returns what function returns or None if timeout expires.
@@ -85,7 +88,7 @@ def retry(function, timeout=10, delay=.0
delay = retry_delay(deadline, timeout, delay, max_delay)
if delay is None: return None
-def retry_exception(function, timeout=10, delay=.001, max_delay=1, exception_test=None):
+def retry_exception(function, timeout=default_timeout, delay=.001, max_delay=1, exception_test=None):
"""Call function until it returns without exception or timeout expires.
Double the delay for each retry up to max_delay.
Calls exception_test with any exception raised by function, exception_test
@@ -102,7 +105,18 @@ def retry_exception(function, timeout=10
delay = retry_delay(deadline, timeout, delay, max_delay)
if delay is None: raise
-def wait_port(port, host="127.0.0.1", **retry_kwargs):
+def port_available(port, host='0.0.0.0'):
+ """Return true if connecting to host:port gives 'connection refused'."""
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ s.connect((host, port))
+ s.close()
+ except socket.error, e:
+ return e.errno == 111
+ except: pass
+ return False
+
+def wait_port(port, host='0.0.0.0', **retry_kwargs):
"""Wait up to timeout for port (on host) to be connectable.
Takes same keyword arguments as retry to control the timeout"""
def check(e): # Only retry on connection refused
@@ -120,6 +134,12 @@ def wait_ports(ports, host="127.0.0.1",
Takes same keyword arguments as retry to control the timeout"""
for p in ports: wait_port(p)
+def message(**properties):
+ """Convenience to create a proton.Message with properties set"""
+ m = Message()
+ for name, value in properties.iteritems(): setattr(m, name, value)
+ return m
+
class Process(subprocess.Popen):
"""Popen that can be torn down at the end of a TestCase and stores its output."""
@@ -194,10 +214,51 @@ class Qdrouterd(Process):
def props(p): return "".join([" %s: %s\n"%(k,v) for k,v in p.iteritems()])
return "".join(["%s {\n%s}\n"%(n,props(self._defs(n,p))) for n,p in self])
+ class Agent(object):
+ """Management agent"""
+ def __init__(self, router):
+ self.router = router
+ self.messenger = Messenger()
+ self.messenger.route("amqp:/*", "amqp://0.0.0.0:%s/$1"%router.ports[0])
+ self.address = "amqp:/$management"
+ self.subscription = self.messenger.subscribe("amqp:/#")
+ self.reply_to = self.subscription.address
+
+ def stop(self): self.messenger.stop()
+
+ def get(self, type):
+ """Return a list of attribute dicts for each instance of type"""
+ request = message(address=self.address, reply_to=self.reply_to,
+ correlation_id=1,
+ properties={u'operation':u'QUERY', u'entityType':type},
+ body={'attributeNames':[]})
+ response = Message()
+ self.messenger.put(request)
+ self.messenger.send()
+ self.messenger.recv(1)
+ self.messenger.get(response)
+ if response.properties['statusCode'] != 200:
+ raise Exception("Agent error: %d %s" % (
+ response.properties['statusCode'],
+ response.properties['statusDescription']))
+ attrs = response.body['attributeNames']
+ return [dict(zip(attrs, values)) for values in response.body['results']]
+
+
def __init__(self, name, config, **kwargs):
self.config = copy(config)
super(Qdrouterd, self).__init__(
name, ['qdrouterd', '-c', config.write(name)], expect=Process.RUNNING)
+ self._agent = None
+
+ @property
+ def agent(self):
+ if not self._agent: self._agent = self.Agent(self)
+ return self._agent
+
+ def teardown(self):
+ if self._agent: self._agent.stop()
+ super(Qdrouterd, self).teardown()
@property
def ports(self):
@@ -206,13 +267,16 @@ class Qdrouterd(Process):
@property
def addresses(self):
- """Return host:port addresses for all listeners"""
+ """Return amqp://host:port addresses for all listeners"""
return [ "amqp://%s:%s"%(l['addr'],l['port']) for l in self.config.sections('listener') ]
- @property
- def address(self):
- """Return address of the first listener"""
-
+ def is_connected(self, port, host='0.0.0.0'):
+ """If router has a connection to host:port return the management info.
+ Otherwise return None"""
+ connections = self.agent.get('org.apache.qpid.dispatch.connection')
+ for c in connections:
+ if c['name'] == '%s:%s'%(host, port): return c
+ return None
class Qpidd(Process):
@@ -238,7 +302,7 @@ class Qpidd(Process):
def qm_connect(self):
"""Make a qpid_messaging connection to the broker"""
- qm.Connection.establish(self.address)
+ return qm.Connection.establish(self.address)
@property
def agent(self, **kwargs):
@@ -264,7 +328,8 @@ class TestCase(unittest.TestCase):
"""A test case that creates a separate directory for each test and
cleans up during teardown."""
- def setUp(self):
+ def __init__(self, *args, **kwargs):
+ super(TestCase, self).__init__(*args, **kwargs)
self.save_dir = os.getcwd()
# self.id() is normally _module[.module].TestClass.test_name
id = self.id().split(".")
@@ -272,12 +337,12 @@ class TestCase(unittest.TestCase):
dir = id[0]
else: # use dir = module[.module].TestClass/test_name
dir = os.path.join(".".join(id[0:-1]), id[-1])
- shutil.rmtree(dir, ignore_errors=True) # FIXME aconway 2014-03-27: wrong place
+ shutil.rmtree(dir, ignore_errors=True)
os.makedirs(dir)
os.chdir(dir)
self.cleanup_list = []
- # FIXME aconway 2014-04-29: need a safer (configurable?) way to pick ports.
- self.next_port = random.randint(30000,40000)
+ self.port_range = (20000, 30000)
+ self.next_port = random.randint(*self.port_range)
def tearDown(self):
os.chdir(self.save_dir)
@@ -290,9 +355,17 @@ class TestCase(unittest.TestCase):
def cleanup(self, x): self.cleanup_list.append(x); return x
def get_port(self):
- """Get a (hopefully) unused port"""
+ """Get an unused port"""
+ def advance(): # Advance with wrap-around
+ self.next_port += 1
+ if self.next_port >= self.port_range[1]: self.next_port = port_range[0]
+ start = self.next_port
+ while not port_available(self.next_port):
+ advance()
+ if self.next_port == start:
+ raise Exception("No avaliable ports in range %s", self.port_range)
p = self.next_port;
- self.next_port += 1;
+ advance()
return p
def popen(self, *args, **kwargs):
@@ -307,16 +380,16 @@ class TestCase(unittest.TestCase):
"""Return a Qpidd that will be cleaned up on teardown"""
return self.cleanup(Qpidd(*args, **kwargs))
- def messenger(self, name="test-messenger", timeout=1):
+ def messenger(self, name="test-messenger", timeout=default_timeout, blocking=True, cleanup=True):
"""Return a started Messenger that will be cleaned up on teardown."""
m = Messenger(name)
m.timeout = timeout
+ m.blocking = blocking
m.start()
- self.cleanup(m)
+ if cleanup: self.cleanup(m)
return m
def message(self, **properties):
"""Convenience to create a proton.Message with properties set"""
- m = Message()
- for name, value in properties.iteritems(): setattr(m, name, value)
- return m
+ global message
+ return message(**properties)
Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1594588&r1=1594587&r2=1594588&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Wed May 14 14:45:54 2014
@@ -36,8 +36,6 @@ from system_test import *
class BrokerSystemTest(TestCase):
def test_broker(self):
-
-
testq = 'testq'
# Start two qpidd brokers called qpidd0 and qpidd1
@@ -46,6 +44,11 @@ class BrokerSystemTest(TestCase):
Qpidd.Config({'port':self.get_port(), 'trace':1}))
for i in xrange(2) ]
+ # FIXME aconway 2014-05-13: router waypoint connection seems fragile
+ # unless everything is set up beforehand.
+ wait_ports([q.port for q in qpidd])
+ qpidd[0].agent.addQueue(testq)
+
# Start a qdrouterd
router_conf = Qdrouterd.Config([
('log', { 'module':'DEFAULT', 'level':'NOTICE' }),
@@ -55,34 +58,29 @@ class BrokerSystemTest(TestCase):
('container', {'container-name':self.id()}),
('router', { 'mode': 'standalone', 'router-id': self.id() }),
('listener', {'addr':'0.0.0.0', 'port':self.get_port()}),
- ('connector', {'name':'qpidd0', 'addr':'localhost', 'port':qpidd[0].port}),
- ('connector', {'name':'qpidd1', 'addr':'localhost', 'port':qpidd[1].port}),
- ('fixed-address', {'prefix':'/testme/', 'fanout':'multiple'}),
- ('waypoint', {'name':testq, 'out-phase':1, 'connector':'qpidd0'})
+ ('connector', {'name':'qpidd0', 'addr':'0.0.0.0', 'port':qpidd[0].port}),
+ ('connector', {'name':'qpidd1', 'addr':'0.0.0.0', 'port':qpidd[1].port}),
+ ('fixed-address', {'prefix':'testq', 'phase':0, 'fanout':'single', 'bias':'closest'}),
+ ('fixed-address', {'prefix':'testq', 'phase':1, 'fanout':'single', 'bias':'closest'}),
+ ('waypoint', {'name':'testq', 'out-phase':1, 'in-phase':0, 'connector':'qpidd0'})
])
router = self.qdrouterd('router0', router_conf)
-
- # Wait for broker & router to be ready
- wait_ports([q.port for q in qpidd] + router.ports)
- qpidd[0].agent.addQueue(testq)
+ wait_ports(router.ports)
+ retry(lambda: router.is_connected(qpidd[0].port))
# Test for waypoint routing via queue
- m=self.message(address=router.addresses[0]+"/"+testq, body="c")
+ m=self.message(address=router.addresses[0]+"/"+testq, body="FOO")
msgr = self.messenger()
- time.sleep(3) # FIXME aconway 2014-05-07: race on router
msgr.subscribe(m.address)
- time.sleep(3) # FIXME aconway 2014-05-07: race on router
msgr.put(m)
msgr.send()
msg = Message()
msgr.recv(1)
msgr.get(msg)
msgr.accept()
+ msgr.flush()
self.assertEqual(msg.body, m.body)
aq = qpidd[0].agent.getQueue(testq)
- aq.update()
self.assertEquals((aq.msgTotalEnqueues, aq.msgTotalDequeues), (1,1))
- # FIXME aconway 2014-05-05: test for waypoint routing via queue
-
if __name__ == '__main__': unittest.main()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org