You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/12/12 16:47:59 UTC

qpid-dispatch git commit: NO-JIRA: add more tests for the edge router

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master fafd1d618 -> 15b70c433


NO-JIRA: add more tests for the edge router


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/15b70c43
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/15b70c43
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/15b70c43

Branch: refs/heads/master
Commit: 15b70c43379c4fe730922d10425ef96d76ce9eeb
Parents: fafd1d6
Author: Kenneth Giusti <kg...@apache.org>
Authored: Thu Dec 6 15:37:23 2018 -0500
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Wed Dec 12 11:44:14 2018 -0500

----------------------------------------------------------------------
 tests/system_test.py              |  9 ++--
 tests/system_tests_edge_router.py | 93 +++++++++++++++++++++++++++++++++-
 tests/test_broker.py              | 30 ++++++++---
 3 files changed, 119 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index ef15959..8820cc2 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -717,11 +717,14 @@ def main_module():
 class AsyncTestReceiver(MessagingHandler):
     """
     A simple receiver that runs in the background and queues any received
-    messages.  Messages can be retrieved from this thread via the queue member
+    messages.  Messages can be retrieved from this thread via the queue member.
+    :param wait: block the constructor until the link has been fully
+                 established.
     """
     Empty = Queue.Empty
 
-    def __init__(self, address, source, conn_args=None, container_id=None):
+    def __init__(self, address, source, conn_args=None, container_id=None,
+                 wait=True):
         super(AsyncTestReceiver, self).__init__()
         self.address = address
         self.source = source
@@ -736,7 +739,7 @@ class AsyncTestReceiver(MessagingHandler):
         self._thread = Thread(target=self._main)
         self._thread.daemon = True
         self._thread.start()
-        if self._ready.wait(timeout=TIMEOUT) is False:
+        if wait and self._ready.wait(timeout=TIMEOUT) is False:
             raise Exception("Timed out waiting for receiver start")
 
     def _main(self):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index b84e214..3e2ead5 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -23,6 +23,7 @@ from __future__ import absolute_import
 from __future__ import print_function
 
 from time import sleep
+from threading import Event
 from threading import Timer
 
 import unittest2 as unittest
@@ -30,10 +31,13 @@ from proton import Message, Timeout
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
 from system_test import AsyncTestReceiver
 from system_test import AsyncTestSender
+from system_test import QdManager
 from system_tests_link_routes import ConnLinkRouteService
 from test_broker import FakeService
+from test_broker import FakeBrokerStop
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, DynamicNodeProperties
+from proton.utils import BlockingConnection
 from qpid_dispatch.management.client import Node
 from subprocess import PIPE, STDOUT
 import re
@@ -736,6 +740,91 @@ class LinkRouteProxyTest(TestCase):
             tr.queue.get(timeout=TIMEOUT)
         tr.stop()
 
+    def _validate_topology(self, router, expected_links, address):
+        """
+        query existing links and verify they are set up as expected
+        """
+        mgmt = QdManager(self, address=router)
+        # fetch all the connections
+        cl = mgmt.query('org.apache.qpid.dispatch.connection')
+        # map them by their identity
+        conns = dict([(c['identity'], c) for c in cl])
+
+        # now fetch all links for the address
+        ll = mgmt.query('org.apache.qpid.dispatch.router.link')
+        test_links = [l for l in ll if
+                      l.get('owningAddr', '').find(address) != -1]
+        self.assertEqual(len(expected_links), len(test_links))
+
+        for elink in expected_links:
+            matches = filter(lambda l: (l['linkDir'] == elink[0]
+                                        and
+                                        conns[l['connectionId']]['container'] == elink[1]
+                                        and
+                                        conns[l['connectionId']]['role'] == elink[2]),
+                             test_links)
+            self.assertTrue(len(matches) == 1)
+
+    def test_link_topology(self):
+        """
+        Verify that the link topology that results from activating a link route
+        and sending traffic is correct
+        """
+        fs = FakeService(self.EA1.route_container)
+        self.INT_B.wait_address("CfgLinkRoute1")
+
+        # create a sender on one edge and the receiver on another
+        bc_b = BlockingConnection(self.EB1.listener, timeout=TIMEOUT)
+        erx = bc_b.create_receiver(address="CfgLinkRoute1/buhbye", credit=10)
+        bc_a = BlockingConnection(self.EA1.listener, timeout=TIMEOUT)
+        etx = bc_a.create_sender(address="CfgLinkRoute1/buhbye")
+
+        etx.send(Message(body="HI THERE"), timeout=TIMEOUT)
+        self.assertEqual("HI THERE", erx.receive(timeout=TIMEOUT).body)
+        erx.accept()
+
+        # expect the following links have been established for the
+        # "CfgLinkRoute1/buhbye" address:
+
+        # EA1
+        #   1 out link to   INT.A       (connection role: edge)
+        #   1 in  link from bc_a        (normal)
+        #   1 in  link from FakeBroker  (route-container)
+        #   1 out link to   FakeBroker  (route-container)
+        # INT.A
+        #   1 in  link from EA1         (edge)
+        #   1 out link to   INT.B       (inter-router)
+        # INT.B
+        #   1 out link to   EB1         (edge)
+        #   1 in  link from INT.A       (inter-router)
+        # EB1
+        #   1 out link to   bc_b        (normal)
+        #   1 in  link from INT.B       (edge)
+
+        expect = {
+            self.EA1.listener: [
+                ('in',  bc_a.container.container_id, 'normal'),
+                ('in',  'FakeBroker', 'route-container'),
+                ('out', 'FakeBroker', 'route-container'),
+                ('out', 'INT.A',      'edge')],
+            self.INT_A.listener: [
+                ('in',  'EA1',        'edge'),
+                ('out', 'INT.B',      'inter-router')],
+            self.INT_B.listener: [
+                ('in',  'INT.A',      'inter-router'),
+                ('out', 'EB1',        'edge')],
+            self.EB1.listener: [
+                ('in',  'INT.B',      'edge'),
+                ('out', bc_b.container.container_id, 'normal')]
+            }
+        for router, expected_links in expect.items():
+            self._validate_topology(router, expected_links,
+                                    'CfgLinkRoute1/buhbye')
+
+        fs.join()
+        self.assertEqual(1, fs.in_count)
+        self.assertEqual(1, fs.out_count)
+
     def test_link_route_proxy_configured(self):
         """
         Activate the configured link routes via a FakeService, verify proxies
@@ -777,7 +866,7 @@ class LinkRouteProxyTest(TestCase):
         configured some link routes.  Then have clients on the interior
         exchange messages via the fake service.
         """
-        fs = ConnLinkRouteService(self.EA1.addresses[1],
+        fs = ConnLinkRouteService(self.EA1.route_container,
                                   container_id="FakeService",
                                   config = [("ConnLinkRoute1",
                                              {"pattern": "Conn/*/One",
@@ -866,7 +955,7 @@ class LinkRouteProxyTest(TestCase):
 
         # activate the pre-configured link routes
         ea1_mgmt = self.EA1.management
-        fs = FakeService(self.EA1.addresses[1])
+        fs = FakeService(self.EA1.route_container)
         self.INT_B.wait_address("CfgLinkRoute1")
 
         for i in range(10):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/test_broker.py
----------------------------------------------------------------------
diff --git a/tests/test_broker.py b/tests/test_broker.py
index 0b9bdfa..a05df02 100644
--- a/tests/test_broker.py
+++ b/tests/test_broker.py
@@ -39,6 +39,11 @@ from proton.reactor import AtMostOnce
 from system_test import TIMEOUT
 
 
+class FakeBrokerStop(Exception):
+    """stop the broker from a handler callback"""
+    pass
+
+
 class FakeBroker(MessagingHandler):
     """
     A fake broker-like service that listens for client connections
@@ -104,14 +109,23 @@ class FakeBroker(MessagingHandler):
     def _main(self):
         self._container.timeout = 1.0
         self._container.start()
-        while self._container.process():
-            if self._stop_thread:
-                if self.acceptor:
-                    self.acceptor.close()
-                    self.acceptor = None
-                for c in self._connections:
-                    c.close()
-                self._connections = []
+
+        try:
+            while self._container.process():
+                if self._stop_thread:
+                    break
+
+            if self.acceptor:
+                self.acceptor.close()
+                self.acceptor = None
+            for c in self._connections:
+                c.close()
+            self._connections = []
+            self._container.process()
+        except FakeBrokerStop:
+            # this abruptly kills the broker useful to test how dispatch deals
+            # with hung/stopped containers
+            pass
 
     def join(self):
         self._stop_thread = True


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org