You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/05/14 16:45:54 UTC

svn commit: r1594588 - in /qpid/dispatch/trunk: src/waypoint.c tests/config_build.sh.in tests/system_test.py tests/system_tests_broker.py

Author: aconway
Date: Wed May 14 14:45:54 2014
New Revision: 1594588

URL: http://svn.apache.org/r1594588
Log:
QPID-DISPATCH-52: Automated system test of dispatch sending messages through a broker queue.

Using waypoints, the test sends a message into dispatch, out to a broker queue,
back to dispatch fromt the queue and delivers to a dispatch subscriber.

Gotchas:
- broker and broker queue must be set up before starting dispatch.
- must wait for dispatch to connect to the broker before sending message.

Improvements:
- system_tests.TestCase.get_port: check if port is used before returning it.
- Qdrouter.is_connected: test if router has a connection using management info.
- additional logging in waypoint.c
- fixed test/config_build.sh env to find qdstat

Modified:
    qpid/dispatch/trunk/src/waypoint.c
    qpid/dispatch/trunk/tests/config_build.sh.in
    qpid/dispatch/trunk/tests/system_test.py
    qpid/dispatch/trunk/tests/system_tests_broker.py

Modified: qpid/dispatch/trunk/src/waypoint.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint.c?rev=1594588&r1=1594587&r2=1594588&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint.c (original)
+++ qpid/dispatch/trunk/src/waypoint.c Wed May 14 14:45:54 2014
@@ -40,6 +40,8 @@ struct qd_waypoint_context_t {
     qd_waypoint_ref_list_t refs;
 };
 
+// Convenience for logging waypoint messages, expects qd and wp to be defined.
+#define LOG(LEVEL, MSG, ...) qd_log(qd->router->log_source, QD_LOG_##LEVEL, "Waypoint '%s': " MSG, wp->name, ##__VA_ARGS__)
 
 static void qd_waypoint_visit_sink_LH(qd_dispatch_t *qd, qd_waypoint_t *wp)
 {
@@ -47,11 +49,13 @@ static void qd_waypoint_visit_sink_LH(qd
     qd_address_t *addr   = wp->in_address;
     char          unused;
 
+    LOG(TRACE, "Visit sink");
     //
     // If the waypoint has no in-address, look it up in the hash table or create
     // a new one and put it in the hash table.
     //
     if (!addr) {
+	LOG(TRACE, "Create sink address, in-phase %c", wp->in_phase);
         //
         // Compose the phased-address and search the routing table for the address.
         // If it's not found, add it to the table but leave the link/router linkages empty.
@@ -76,8 +80,10 @@ static void qd_waypoint_visit_sink_LH(qd
         qd_field_iterator_free(iter);
     }
 
-    if (!wp->connected)
+    if (!wp->connected) {
+	LOG(TRACE, "Start on-demand sink connector");
         qd_connection_manager_start_on_demand(qd, wp->connector);
+    }
     else if (!wp->out_link) {
         wp->out_link = qd_link(router->node, wp->connection, QD_OUTGOING, wp->name);
         pn_terminus_set_address(qd_link_target(wp->out_link), wp->name);
@@ -109,7 +115,7 @@ static void qd_waypoint_visit_sink_LH(qd
         pn_link_open(qd_link_pn(wp->out_link));
         qd_link_activate(wp->out_link);
 
-        qd_log(router->log_source, QD_LOG_DEBUG, "Added outgoing link for waypoint: %s", wp->name);
+	LOG(TRACE, "Create sink out-link '%s'", pn_link_name(qd_link_pn(wp->out_link)));
     }
 }
 
@@ -120,11 +126,13 @@ static void qd_waypoint_visit_source_LH(
     qd_address_t *addr   = wp->out_address;
     char          unused;
 
+    LOG(TRACE, "Visit source");
     //
     // If the waypoint has no out-address, look it up in the hash table or create
     // a new one and put it in the hash table.
     //
     if (!addr) {
+	LOG(TRACE, "Create source address, out-phase %c", wp->out_phase);
         //
         // Compose the phased-address and search the routing table for the address.
         // If it's not found, add it to the table but leave the link/router linkages empty.
@@ -149,8 +157,10 @@ static void qd_waypoint_visit_source_LH(
         qd_field_iterator_free(iter);
     }
 
-    if (!wp->connected)
+    if (!wp->connected) {
+	LOG(TRACE, "Start source on-demand connector");
         qd_connection_manager_start_on_demand(qd, wp->connector);
+    }
     else if (!wp->in_link) {
         wp->in_link = qd_link(router->node, wp->connection, QD_INCOMING, wp->name);
         pn_terminus_set_address(qd_link_source(wp->in_link), wp->name);
@@ -174,10 +184,9 @@ static void qd_waypoint_visit_source_LH(
         pn_link_open(qd_link_pn(wp->in_link));
         qd_link_activate(wp->in_link);
 
-        qd_log(router->log_source, QD_LOG_DEBUG, "Added incoming link for waypoint: %s", wp->name);
+	LOG(TRACE, "Create source in-link '%s'", pn_link_name(qd_link_pn(wp->in_link)));
     }
-
-    if (DEQ_SIZE(addr->rlinks) + DEQ_SIZE(addr->rnodes) > 0) {
+    if (wp->in_link && (DEQ_SIZE(addr->rlinks) + DEQ_SIZE(addr->rnodes) > 0)) {
         //
         // CASE: This address has reachable destinations in the network.
         //       If there is no inbound link from the waypoint source,
@@ -185,7 +194,7 @@ static void qd_waypoint_visit_source_LH(
         //
         pn_link_flow(qd_link_pn(wp->in_link), 1);
         qd_link_activate(wp->in_link);
-        qd_log(router->log_source, QD_LOG_DEBUG, "Added credit for incoming link for waypoint: %s", wp->name);
+        LOG(DEBUG, "Added credit for incoming link '%s'", pn_link_name(qd_link_pn(wp->in_link)));
     } else {
         //
         // CASE: This address has no reachable destinations in the network.
@@ -248,9 +257,7 @@ void qd_waypoint_activate_all(qd_dispatc
         if (!wp->connector) {
             wp->connector = qd_connection_manager_find_on_demand(qd, wp->connector_name);
             if (!wp->connector) {
-                qd_log(qd->router->log_source, QD_LOG_ERROR,
-                       "In waypoint '%s', on-demand connector '%s' not found",
-                       wp->name, wp->connector_name);
+                LOG(ERROR, "On-demand connector '%s' not found", wp->connector_name);
                 continue;
             }
 
@@ -281,7 +288,7 @@ void qd_waypoint_connection_opened(qd_di
 {
     qd_waypoint_context_t *context = (qd_waypoint_context_t*) qd_config_connector_context(cc);
 
-    qd_log(qd->router->log_source, QD_LOG_INFO, "On-demand connector opened: %s",
+    qd_log(qd->router->log_source, QD_LOG_INFO, "On-demand connector '%s' opened",
            qd_config_connector_name(cc));
 
     sys_mutex_lock(qd->router->lock);
@@ -314,6 +321,7 @@ void qd_waypoint_link_closed(qd_dispatch
 void qd_waypoint_address_updated_LH(qd_dispatch_t *qd, qd_address_t *addr)
 {
     qd_waypoint_t *wp = DEQ_HEAD(qd->router->waypoints);
+    LOG(TRACE, "Address updated");
     while (wp) {
         if (wp->out_address == addr)
             qd_waypoint_visit_LH(qd, wp);

Modified: qpid/dispatch/trunk/tests/config_build.sh.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config_build.sh.in?rev=1594588&r1=1594587&r2=1594588&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config_build.sh.in (original)
+++ qpid/dispatch/trunk/tests/config_build.sh.in Wed May 14 14:45:54 2014
@@ -22,4 +22,4 @@ export SOURCE_DIR=${CMAKE_SOURCE_DIR}
 export BUILD_DIR=${CMAKE_BINARY_DIR}
 
 export PYTHONPATH=$SOURCE_DIR/python:$SOURCE_DIR/tests:$PYTHONPATH
-export PATH=$BUILD_DIR:$BUILD_DIR/router:$SOURCE_DIR/bin:$PATH
+export PATH=$BUILD_DIR:$BUILD_DIR/router:$BUILD_DIR/tools:$SOURCE_DIR/bin:$PATH

Modified: qpid/dispatch/trunk/tests/system_test.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1594588&r1=1594587&r2=1594588&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Wed May 14 14:45:54 2014
@@ -71,7 +71,10 @@ def retry_delay(deadline, timeout, delay
     time.sleep(min(delay, remaining))
     return min(delay*2, max_delay)
 
-def retry(function, timeout=10, delay=.001, max_delay=1):
+
+default_timeout=float(os.environ.get("QPID_SYSTEM_TEST_TIMEOUT", 5))
+
+def retry(function, timeout=default_timeout, delay=.001, max_delay=1):
     """Call function until it returns a true value or timeout expires.
     Double the delay for each retry up to max_delay.
     Returns what function returns or None if timeout expires.
@@ -85,7 +88,7 @@ def retry(function, timeout=10, delay=.0
             delay = retry_delay(deadline, timeout, delay, max_delay)
             if delay is None: return None
 
-def retry_exception(function, timeout=10, delay=.001, max_delay=1, exception_test=None):
+def retry_exception(function, timeout=default_timeout, delay=.001, max_delay=1, exception_test=None):
     """Call function until it returns without exception or timeout expires.
     Double the delay for each retry up to max_delay.
     Calls exception_test with any exception raised by function, exception_test
@@ -102,7 +105,18 @@ def retry_exception(function, timeout=10
             delay = retry_delay(deadline, timeout, delay, max_delay)
             if delay is None: raise
 
-def wait_port(port, host="127.0.0.1", **retry_kwargs):
+def port_available(port, host='0.0.0.0'):
+    """Return true if connecting to host:port gives 'connection refused'."""
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    try:
+        s.connect((host, port))
+        s.close()
+    except socket.error, e:
+        return e.errno == 111
+    except: pass
+    return False
+
+def wait_port(port, host='0.0.0.0', **retry_kwargs):
     """Wait up to timeout for port (on host) to be connectable.
     Takes same keyword arguments as retry to control the timeout"""
     def check(e):               # Only retry on connection refused
@@ -120,6 +134,12 @@ def wait_ports(ports, host="127.0.0.1", 
     Takes same keyword arguments as retry to control the timeout"""
     for p in ports: wait_port(p)
 
+def message(**properties):
+    """Convenience to create a proton.Message with properties set"""
+    m = Message()
+    for name, value in properties.iteritems(): setattr(m, name, value)
+    return m
+
 class Process(subprocess.Popen):
     """Popen that can be torn down at the end of a TestCase and stores its output."""
 
@@ -194,10 +214,51 @@ class Qdrouterd(Process):
             def props(p): return "".join(["    %s: %s\n"%(k,v) for k,v in p.iteritems()])
             return "".join(["%s {\n%s}\n"%(n,props(self._defs(n,p))) for n,p in self])
 
+    class Agent(object):
+        """Management agent"""
+        def __init__(self, router):
+            self.router = router
+            self.messenger = Messenger()
+            self.messenger.route("amqp:/*", "amqp://0.0.0.0:%s/$1"%router.ports[0])
+            self.address = "amqp:/$management"
+            self.subscription = self.messenger.subscribe("amqp:/#")
+            self.reply_to = self.subscription.address
+
+        def stop(self): self.messenger.stop()
+
+        def get(self, type):
+            """Return a list of attribute dicts for each instance of type"""
+            request = message(address=self.address, reply_to=self.reply_to,
+                              correlation_id=1,
+                              properties={u'operation':u'QUERY', u'entityType':type},
+                              body={'attributeNames':[]})
+            response = Message()
+            self.messenger.put(request)
+            self.messenger.send()
+            self.messenger.recv(1)
+            self.messenger.get(response)
+            if response.properties['statusCode'] != 200:
+                raise Exception("Agent error: %d %s" % (
+                    response.properties['statusCode'],
+                    response.properties['statusDescription']))
+            attrs = response.body['attributeNames']
+            return [dict(zip(attrs, values)) for values in response.body['results']]
+
+
     def __init__(self, name, config, **kwargs):
         self.config = copy(config)
         super(Qdrouterd, self).__init__(
             name, ['qdrouterd', '-c', config.write(name)], expect=Process.RUNNING)
+        self._agent = None
+
+    @property
+    def agent(self):
+        if not self._agent: self._agent = self.Agent(self)
+        return self._agent
+
+    def teardown(self):
+        if self._agent: self._agent.stop()
+        super(Qdrouterd, self).teardown()
 
     @property
     def ports(self):
@@ -206,13 +267,16 @@ class Qdrouterd(Process):
 
     @property
     def addresses(self):
-        """Return host:port addresses for all listeners"""
+        """Return amqp://host:port addresses for all listeners"""
         return [ "amqp://%s:%s"%(l['addr'],l['port']) for l in self.config.sections('listener') ]
 
-    @property
-    def address(self):
-        """Return address of the first listener"""
-        
+    def is_connected(self, port, host='0.0.0.0'):
+        """If router has a connection to host:port return the management info.
+        Otherwise return None"""
+        connections = self.agent.get('org.apache.qpid.dispatch.connection')
+        for c in connections:
+            if c['name'] == '%s:%s'%(host, port): return c
+        return None
 
 
 class Qpidd(Process):
@@ -238,7 +302,7 @@ class Qpidd(Process):
 
     def qm_connect(self):
         """Make a qpid_messaging connection to the broker"""
-        qm.Connection.establish(self.address)
+        return qm.Connection.establish(self.address)
 
     @property
     def agent(self, **kwargs):
@@ -264,7 +328,8 @@ class TestCase(unittest.TestCase):
     """A test case that creates a separate directory for each test and
     cleans up during teardown."""
 
-    def setUp(self):
+    def __init__(self, *args, **kwargs):
+        super(TestCase, self).__init__(*args, **kwargs)
         self.save_dir = os.getcwd()
         # self.id() is normally _module[.module].TestClass.test_name
         id = self.id().split(".")
@@ -272,12 +337,12 @@ class TestCase(unittest.TestCase):
             dir = id[0]
         else:                   # use dir = module[.module].TestClass/test_name
             dir = os.path.join(".".join(id[0:-1]), id[-1])
-        shutil.rmtree(dir, ignore_errors=True) # FIXME aconway 2014-03-27: wrong place
+        shutil.rmtree(dir, ignore_errors=True)
         os.makedirs(dir)
         os.chdir(dir)
         self.cleanup_list = []
-        # FIXME aconway 2014-04-29: need a safer (configurable?) way to pick ports.
-        self.next_port = random.randint(30000,40000)
+        self.port_range = (20000, 30000)
+        self.next_port = random.randint(*self.port_range)
 
     def tearDown(self):
         os.chdir(self.save_dir)
@@ -290,9 +355,17 @@ class TestCase(unittest.TestCase):
     def cleanup(self, x): self.cleanup_list.append(x); return x
 
     def get_port(self):
-        """Get a (hopefully) unused port"""
+        """Get an unused port"""
+        def advance():          # Advance with wrap-around
+            self.next_port += 1
+            if self.next_port >= self.port_range[1]: self.next_port = port_range[0]
+        start = self.next_port
+        while not port_available(self.next_port):
+            advance()
+            if self.next_port == start:
+                raise Exception("No avaliable ports in range %s", self.port_range)
         p = self.next_port;
-        self.next_port += 1;
+        advance()
         return p
 
     def popen(self, *args, **kwargs):
@@ -307,16 +380,16 @@ class TestCase(unittest.TestCase):
         """Return a Qpidd that will be cleaned up on teardown"""
         return self.cleanup(Qpidd(*args, **kwargs))
 
-    def messenger(self, name="test-messenger", timeout=1):
+    def messenger(self, name="test-messenger", timeout=default_timeout, blocking=True, cleanup=True):
         """Return a started Messenger that will be cleaned up on teardown."""
         m = Messenger(name)
         m.timeout = timeout
+        m.blocking = blocking
         m.start()
-        self.cleanup(m)
+        if cleanup: self.cleanup(m)
         return m
 
     def message(self, **properties):
         """Convenience to create a proton.Message with properties set"""
-        m = Message()
-        for name, value in properties.iteritems(): setattr(m, name, value)
-        return m
+        global message
+        return message(**properties)

Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1594588&r1=1594587&r2=1594588&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Wed May 14 14:45:54 2014
@@ -36,8 +36,6 @@ from system_test import *
 class BrokerSystemTest(TestCase):
 
     def test_broker(self):
-
-
         testq = 'testq'
 
         # Start two qpidd brokers called qpidd0 and qpidd1
@@ -46,6 +44,11 @@ class BrokerSystemTest(TestCase):
                        Qpidd.Config({'port':self.get_port(), 'trace':1}))
                   for i in xrange(2) ]
 
+        # FIXME aconway 2014-05-13: router waypoint connection seems fragile
+        # unless everything is set up beforehand.
+        wait_ports([q.port for q in qpidd])
+        qpidd[0].agent.addQueue(testq)
+
         # Start a qdrouterd
         router_conf = Qdrouterd.Config([
             ('log', { 'module':'DEFAULT', 'level':'NOTICE' }),
@@ -55,34 +58,29 @@ class BrokerSystemTest(TestCase):
             ('container', {'container-name':self.id()}),
             ('router', { 'mode': 'standalone', 'router-id': self.id() }),
             ('listener', {'addr':'0.0.0.0', 'port':self.get_port()}),
-            ('connector', {'name':'qpidd0', 'addr':'localhost', 'port':qpidd[0].port}),
-            ('connector', {'name':'qpidd1', 'addr':'localhost', 'port':qpidd[1].port}),
-            ('fixed-address', {'prefix':'/testme/', 'fanout':'multiple'}),
-            ('waypoint', {'name':testq, 'out-phase':1, 'connector':'qpidd0'})
+            ('connector', {'name':'qpidd0', 'addr':'0.0.0.0', 'port':qpidd[0].port}),
+            ('connector', {'name':'qpidd1', 'addr':'0.0.0.0', 'port':qpidd[1].port}),
+            ('fixed-address', {'prefix':'testq', 'phase':0, 'fanout':'single', 'bias':'closest'}),
+            ('fixed-address', {'prefix':'testq', 'phase':1, 'fanout':'single', 'bias':'closest'}),
+            ('waypoint', {'name':'testq', 'out-phase':1, 'in-phase':0, 'connector':'qpidd0'})
         ])
         router = self.qdrouterd('router0', router_conf)
-
-        # Wait for broker & router to be ready
-        wait_ports([q.port for q in qpidd] + router.ports)
-        qpidd[0].agent.addQueue(testq)
+        wait_ports(router.ports)
+        retry(lambda: router.is_connected(qpidd[0].port))
 
         # Test for waypoint routing via queue
-        m=self.message(address=router.addresses[0]+"/"+testq, body="c")
+        m=self.message(address=router.addresses[0]+"/"+testq, body="FOO")
         msgr = self.messenger()
-        time.sleep(3)           # FIXME aconway 2014-05-07: race on router
         msgr.subscribe(m.address)
-        time.sleep(3)           # FIXME aconway 2014-05-07: race on router
         msgr.put(m)
         msgr.send()
         msg = Message()
         msgr.recv(1)
         msgr.get(msg)
         msgr.accept()
+        msgr.flush()
         self.assertEqual(msg.body, m.body)
         aq = qpidd[0].agent.getQueue(testq)
-        aq.update()
         self.assertEquals((aq.msgTotalEnqueues, aq.msgTotalDequeues), (1,1))
 
-        # FIXME aconway 2014-05-05: test for waypoint routing via queue
-
 if __name__ == '__main__': unittest.main()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org