You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/12/14 19:20:32 UTC

[qpid-dispatch] branch master updated: DISPATCH-1154: update link route proxy tests, add missing delivery reference

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new b594672  DISPATCH-1154: update link route proxy tests, add missing delivery reference
b594672 is described below

commit b594672e41856e626d3b15e849cf39bced836b3a
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Dec 12 11:50:40 2018 -0500

    DISPATCH-1154: update link route proxy tests, add missing delivery reference
---
 src/router_core/core_client_api.c |   9 ++
 tests/system_test.py              |  25 +++-
 tests/system_tests_edge_router.py | 234 ++++++++++++++++++++++++--------------
 tests/test_broker.py              |  32 ++----
 4 files changed, 188 insertions(+), 112 deletions(-)

diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 9ef1c4e..7483869 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -286,6 +286,7 @@ static void _flush_send_queue_CT(qdrc_client_t *client)
         req->delivery = qdrc_endpoint_delivery_CT(client->core,
                                                   client->sender,
                                                   msg);
+        qdr_delivery_incref(req->delivery, "core client send request");
         qdrc_endpoint_send_CT(client->core,
                               client->sender,
                               req->delivery,
@@ -347,6 +348,10 @@ static void _free_request_CT(qdrc_client_t *client,
         qd_compose_free(req->app_properties);
     }
 
+    if (req->delivery) {
+        qdr_delivery_decref_CT(client->core, req->delivery, "core client send request");
+    }
+
     // notify user that the request has completed
     if (req->done_cb) {
         req->done_cb(client->core,
@@ -478,6 +483,10 @@ static void _sender_update_CT(void *context,
             DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req);
             req->on_unsettled_list = false;
 
+            // delivery no longer needed
+            qdr_delivery_decref_CT(client->core, req->delivery, "core client send request");
+            req->delivery = 0;
+
             if (!req->on_reply_list || disposition != PN_ACCEPTED) {
                 // no reply is coming, release the request
                 _free_request_CT(client, req, NULL);
diff --git a/tests/system_test.py b/tests/system_test.py
index 354eac6..bec6b7f 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -43,6 +43,7 @@ except ImportError:
 from threading import Thread
 from threading import Event
 import json
+import uuid
 
 import proton
 from proton import Message, Timeout
@@ -720,11 +721,12 @@ class AsyncTestReceiver(MessagingHandler):
     messages.  Messages can be retrieved from this thread via the queue member.
     :param wait: block the constructor until the link has been fully
                  established.
+    :param recover_link: restart on remote link detach
     """
     Empty = Queue.Empty
 
     def __init__(self, address, source, conn_args=None, container_id=None,
-                 wait=True):
+                 wait=True, recover_link=False):
         super(AsyncTestReceiver, self).__init__()
         self.address = address
         self.source = source
@@ -732,9 +734,11 @@ class AsyncTestReceiver(MessagingHandler):
         self.queue = Queue.Queue()
         self._conn = None
         self._container = Container(self)
-        if container_id is not None:
-            self._container.container_id = container_id
+        cid = container_id or "ATR-%s:%s" % (source, uuid.uuid4())
+        self._container.container_id = cid
         self._ready = Event()
+        self._recover_link = recover_link
+        self._recover_count = 0
         self._stop_thread = False
         self._thread = Thread(target=self._main)
         self._thread.daemon = True
@@ -770,6 +774,17 @@ class AsyncTestReceiver(MessagingHandler):
     def on_link_opened(self, event):
         self._ready.set()
 
+    def on_link_closing(self, event):
+        event.link.close()
+        if self._recover_link and not self._stop_thread:
+            # lesson learned: the generated link name will be the same as the
+            # old link (which is bad) so we specify a new one
+            self._recover_count += 1
+            kwargs = {'source': self.source,
+                      'name': "%s:%s" % (event.link.name, self._recover_count)}
+            rcv = event.container.create_receiver(event.connection,
+                                                  **kwargs)
+
     def on_message(self, event):
         self.queue.put(event.message)
 
@@ -787,8 +802,8 @@ class AsyncTestSender(MessagingHandler):
         self._unaccepted = count
         self._body = body or "test"
         self._container = Container(self)
-        if container_id is not None:
-            self._container.container_id = container_id
+        cid = container_id or "ATS-%s:%s" % (target, uuid.uuid4())
+        self._container.container_id = cid
         self._thread = Thread(target=self._main)
         self._thread.daemon = True
         self._thread.start()
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 3e2ead5..f9a55d9 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -34,7 +34,6 @@ from system_test import AsyncTestSender
 from system_test import QdManager
 from system_tests_link_routes import ConnLinkRouteService
 from test_broker import FakeService
-from test_broker import FakeBrokerStop
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, DynamicNodeProperties
 from proton.utils import BlockingConnection
@@ -723,16 +722,19 @@ class LinkRouteProxyTest(TestCase):
         cls.CONNECTOR_TYPE = 'org.apache.qpid.dispatch.connector'
 
     def _get_address(self, router, address):
+        """Lookup address in route table"""
         a_type = 'org.apache.qpid.dispatch.router.address'
         addrs = router.management.query(a_type).get_dicts()
         return list(filter(lambda a: a['name'].find(address) != -1,
                            addrs))
 
     def _wait_address_gone(self, router, address):
+        """Block until address is removed from the route table"""
         while self._get_address(router, address):
             sleep(0.1)
 
     def _test_traffic(self, sender, receiver, address, count=5):
+        """Generate message traffic between two normal clients"""
         tr = AsyncTestReceiver(receiver, address)
         ts = AsyncTestSender(sender, address, count)
         ts.wait()  # wait until all sent
@@ -740,6 +742,91 @@ class LinkRouteProxyTest(TestCase):
             tr.queue.get(timeout=TIMEOUT)
         tr.stop()
 
+    def _test_attach_weirdness(self, service):
+        """Exercise a service that simulates link attach failures"""
+
+        # create a consumer, do not wait for link to open, reattach
+        # on received detach
+        rx = AsyncTestReceiver(self.EB1.listener, 'CfgLinkRoute1/foo',
+                               wait=False, recover_link=True)
+        service.link_dropped.wait(timeout=TIMEOUT)
+        service.join() # wait for thread exit
+        del service
+
+        # now attach a working service to the same address,
+        # make sure it all works
+        fs = FakeService(self.EA1.route_container)
+        self.INT_B.wait_address("CfgLinkRoute1")
+
+        if True:   # 
+            rx.stop()
+            fs.join()
+        else:
+            tx = AsyncTestSender(self.EA1.listener, 'CfgLinkRoute1/foo',
+
+                                 body="HEY HO LET'S GO!")
+            tx.wait()
+
+            msg = rx.queue.get(timeout=TIMEOUT)
+            self.assertTrue(msg.body == "HEY HO LET'S GO!")
+            rx.stop()
+            fs.join()
+            self.assertEqual(1, fs.in_count)
+            self.assertEqual(1, fs.out_count)
+
+    def test_01_immedate_detach_reattach(self):
+        """
+        Have a service for a link routed address abruptly detach
+        in response to an incoming link attach
+
+        The attaching client from EB1 will get an attach response then an
+        immediate detach.  The client will immediately re-establish the link.
+        """
+        class AttachDropper(FakeService):
+            def __init__(self, *args, **kwargs):
+                super(AttachDropper, self).__init__(*args, **kwargs)
+                self.link_dropped = Event()
+
+            def on_link_remote_open(self, event):
+                # drop it
+                event.link.close()
+                event.connection.close()
+                self.link_dropped.set()
+
+        ad = AttachDropper(self.EA1.route_container)
+        self.INT_B.wait_address("CfgLinkRoute1")
+        self._test_attach_weirdness(ad)
+
+    def test_02_thrashing_link_routes(self):
+        """
+        Rapidly add and delete link routes at the edge
+        """
+
+        # activate the pre-configured link routes
+        ea1_mgmt = self.EA1.management
+        fs = FakeService(self.EA1.route_container)
+        self.INT_B.wait_address("CfgLinkRoute1")
+
+        for i in range(10):
+            lr1 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE,
+                                  name="TestLRout%d" % i,
+                                  attributes={'pattern': 'Test/*/%d/#' % i,
+                                              'containerId': 'FakeBroker',
+                                              'direction': 'out'})
+            lr2 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE,
+                                  name="TestLRin%d" % i,
+                                  attributes={'pattern': 'Test/*/%d/#' % i,
+                                              'containerId': 'FakeBroker',
+                                              'direction': 'in'})
+            # verify that they are correctly propagated (once)
+            if i == 9:
+                self.INT_B.wait_address("Test/*/9/#")
+            lr1.delete()
+            lr2.delete()
+
+        fs.join()
+        self._wait_address_gone(self.INT_B, "CfgLinkRoute1")
+
     def _validate_topology(self, router, expected_links, address):
         """
         query existing links and verify they are set up as expected
@@ -765,7 +852,59 @@ class LinkRouteProxyTest(TestCase):
                              test_links)
             self.assertTrue(len(matches) == 1)
 
-    def test_link_topology(self):
+    def test_03_interior_conn_lost(self):
+        """
+        What happens when the interior connection bounces?
+        """
+        config = Qdrouterd.Config([('router', {'mode': 'edge',
+                                               'id': 'Edge1'}),
+                                   ('listener', {'role': 'normal',
+                                                 'port': self.tester.get_port()}),
+                                   ('listener', {'name': 'rc',
+                                                 'role': 'route-container',
+                                                 'port': self.tester.get_port()}),
+                                   ('linkRoute', {'pattern': 'Edge1/*',
+                                                  'containerId': 'FakeBroker',
+                                                  'direction': 'in'}),
+                                   ('linkRoute', {'pattern': 'Edge1/*',
+                                                  'containerId': 'FakeBroker',
+                                                  'direction': 'out'})])
+        er = self.tester.qdrouterd('Edge1', config, wait=True)
+
+        # activate the link routes before the connection exists
+        fs = FakeService(er.addresses[1])
+        er.wait_address("Edge1/*")
+
+        # create the connection to interior
+        er_mgmt = er.management
+        ctor = er_mgmt.create(type=self.CONNECTOR_TYPE,
+                              name='toA',
+                              attributes={'role': 'edge',
+                                          'port': self.INTA_edge_port})
+        self.INT_B.wait_address("Edge1/*")
+
+        # delete it, and verify the routes are removed
+        ctor.delete()
+        self._wait_address_gone(self.INT_B, "Edge1/*")
+
+        # now recreate and verify routes re-appear
+        ctor = er_mgmt.create(type=self.CONNECTOR_TYPE,
+                              name='toA',
+                              attributes={'role': 'edge',
+                                          'port': self.INTA_edge_port})
+        self.INT_B.wait_address("Edge1/*")
+        self._test_traffic(self.INT_B.listener,
+                           self.INT_B.listener,
+                           "Edge1/One",
+                           count=5)
+        fs.join()
+        self.assertEqual(5, fs.in_count)
+        self.assertEqual(5, fs.out_count)
+
+        er.teardown()
+        self._wait_address_gone(self.INT_B, "Edge1/*")
+
+    def test_50_link_topology(self):
         """
         Verify that the link topology that results from activating a link route
         and sending traffic is correct
@@ -825,7 +964,7 @@ class LinkRouteProxyTest(TestCase):
         self.assertEqual(1, fs.in_count)
         self.assertEqual(1, fs.out_count)
 
-    def test_link_route_proxy_configured(self):
+    def test_51_link_route_proxy_configured(self):
         """
         Activate the configured link routes via a FakeService, verify proxies
         created by passing traffic from/to and interior router
@@ -834,10 +973,12 @@ class LinkRouteProxyTest(TestCase):
 
         fs = FakeService(self.EA1.route_container)
         self.INT_B.wait_address("CfgLinkRoute1")
+
         self._test_traffic(self.INT_B.listener,
                            self.INT_B.listener,
                            "CfgLinkRoute1/hi",
                            count=5)
+
         fs.join()
         self.assertEqual(5, fs.in_count)
         self.assertEqual(5, fs.out_count)
@@ -850,16 +991,19 @@ class LinkRouteProxyTest(TestCase):
 
         fs = FakeService(self.EB1.route_container)
         self.INT_A.wait_address("*.cfg.pattern.#")
+
         self._test_traffic(self.INT_A.listener,
                            self.INT_A.listener,
                            "MATCH.cfg.pattern",
                            count=5)
+
         fs.join()
         self.assertEqual(5, fs.in_count)
         self.assertEqual(5, fs.out_count)
         self._wait_address_gone(self.INT_A, "*.cfg.pattern.#")
 
-    def test_conn_link_route_proxy(self):
+
+    def test_52_conn_link_route_proxy(self):
         """
         Test connection scoped link routes by connecting a fake service to the
         Edge via the route-container connection.  Have the fake service
@@ -896,88 +1040,6 @@ class LinkRouteProxyTest(TestCase):
 
         self._wait_address_gone(self.INT_A, "Conn/*/One")
 
-    def test_interior_conn_lost(self):
-        """
-        What happens when the interior connection bounces?
-        """
-        config = Qdrouterd.Config([('router', {'mode': 'edge',
-                                               'id': 'Edge1'}),
-                                   ('listener', {'role': 'normal',
-                                                 'port': self.tester.get_port()}),
-                                   ('listener', {'name': 'rc',
-                                                 'role': 'route-container',
-                                                 'port': self.tester.get_port()}),
-                                   ('linkRoute', {'pattern': 'Edge1/*',
-                                                  'containerId': 'FakeBroker',
-                                                  'direction': 'in'}),
-                                   ('linkRoute', {'pattern': 'Edge1/*',
-                                                  'containerId': 'FakeBroker',
-                                                  'direction': 'out'})])
-        er = self.tester.qdrouterd('Edge1', config, wait=True)
-
-        # activate the link routes before the connection exists
-        fs = FakeService(er.addresses[1])
-        er.wait_address("Edge1/*")
-
-        # create the connection to interior
-        er_mgmt = er.management
-        ctor = er_mgmt.create(type=self.CONNECTOR_TYPE,
-                              name='toA',
-                              attributes={'role': 'edge',
-                                          'port': self.INTA_edge_port})
-        self.INT_B.wait_address("Edge1/*")
-
-        # delete it, and verify the routes are removed
-        ctor.delete()
-        self._wait_address_gone(self.INT_B, "Edge1/*")
-
-        # now recreate and verify routes re-appear
-        ctor = er_mgmt.create(type=self.CONNECTOR_TYPE,
-                              name='toA',
-                              attributes={'role': 'edge',
-                                          'port': self.INTA_edge_port})
-        self.INT_B.wait_address("Edge1/*")
-        self._test_traffic(self.INT_B.listener,
-                           self.INT_B.listener,
-                           "Edge1/One",
-                           count=5)
-        fs.join()
-        self.assertEqual(5, fs.in_count)
-        self.assertEqual(5, fs.out_count)
-
-        er.teardown()
-        self._wait_address_gone(self.INT_B, "Edge1/*")
-
-    def test_thrashing_link_routes(self):
-        """
-        Rapidly add and delete link routes at the edge
-        """
-
-        # activate the pre-configured link routes
-        ea1_mgmt = self.EA1.management
-        fs = FakeService(self.EA1.route_container)
-        self.INT_B.wait_address("CfgLinkRoute1")
-
-        for i in range(10):
-            lr1 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE,
-                                  name="TestLRout%d" % i,
-                                  attributes={'pattern': 'Test/*/%d/#' % i,
-                                              'containerId': 'FakeBroker',
-                                              'direction': 'out'})
-            lr2 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE,
-                                  name="TestLRin%d" % i,
-                                  attributes={'pattern': 'Test/*/%d/#' % i,
-                                              'containerId': 'FakeBroker',
-                                              'direction': 'in'})
-            # verify that they are correctly propagated (once)
-            if i == 9:
-                self.INT_B.wait_address("Test/*/9/#")
-            lr1.delete()
-            lr2.delete()
-
-        fs.join()
-        self._wait_address_gone(self.INT_B, "CfgLinkRoute1")
-
 
 class Timeout(object):
     def __init__(self, parent):
diff --git a/tests/test_broker.py b/tests/test_broker.py
index a05df02..df60626 100644
--- a/tests/test_broker.py
+++ b/tests/test_broker.py
@@ -39,11 +39,6 @@ from proton.reactor import AtMostOnce
 from system_test import TIMEOUT
 
 
-class FakeBrokerStop(Exception):
-    """stop the broker from a handler callback"""
-    pass
-
-
 class FakeBroker(MessagingHandler):
     """
     A fake broker-like service that listens for client connections
@@ -110,22 +105,17 @@ class FakeBroker(MessagingHandler):
         self._container.timeout = 1.0
         self._container.start()
 
-        try:
-            while self._container.process():
-                if self._stop_thread:
-                    break
-
-            if self.acceptor:
-                self.acceptor.close()
-                self.acceptor = None
-            for c in self._connections:
-                c.close()
-            self._connections = []
-            self._container.process()
-        except FakeBrokerStop:
-            # this abruptly kills the broker useful to test how dispatch deals
-            # with hung/stopped containers
-            pass
+        while self._container.process():
+            if self._stop_thread:
+                break
+
+        if self.acceptor:
+            self.acceptor.close()
+            self.acceptor = None
+        for c in self._connections:
+            c.close()
+        self._connections = []
+        self._container.process()
 
     def join(self):
         self._stop_thread = True


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org