You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/12/14 19:20:32 UTC
[qpid-dispatch] branch master updated: DISPATCH-1154: update link
route proxy tests, add missing delivery reference
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new b594672 DISPATCH-1154: update link route proxy tests, add missing delivery reference
b594672 is described below
commit b594672e41856e626d3b15e849cf39bced836b3a
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Dec 12 11:50:40 2018 -0500
DISPATCH-1154: update link route proxy tests, add missing delivery reference
---
src/router_core/core_client_api.c | 9 ++
tests/system_test.py | 25 +++-
tests/system_tests_edge_router.py | 234 ++++++++++++++++++++++++--------------
tests/test_broker.py | 32 ++----
4 files changed, 188 insertions(+), 112 deletions(-)
diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 9ef1c4e..7483869 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -286,6 +286,7 @@ static void _flush_send_queue_CT(qdrc_client_t *client)
req->delivery = qdrc_endpoint_delivery_CT(client->core,
client->sender,
msg);
+ qdr_delivery_incref(req->delivery, "core client send request");
qdrc_endpoint_send_CT(client->core,
client->sender,
req->delivery,
@@ -347,6 +348,10 @@ static void _free_request_CT(qdrc_client_t *client,
qd_compose_free(req->app_properties);
}
+ if (req->delivery) {
+ qdr_delivery_decref_CT(client->core, req->delivery, "core client send request");
+ }
+
// notify user that the request has completed
if (req->done_cb) {
req->done_cb(client->core,
@@ -478,6 +483,10 @@ static void _sender_update_CT(void *context,
DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req);
req->on_unsettled_list = false;
+ // delivery no longer needed
+ qdr_delivery_decref_CT(client->core, req->delivery, "core client send request");
+ req->delivery = 0;
+
if (!req->on_reply_list || disposition != PN_ACCEPTED) {
// no reply is coming, release the request
_free_request_CT(client, req, NULL);
diff --git a/tests/system_test.py b/tests/system_test.py
index 354eac6..bec6b7f 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -43,6 +43,7 @@ except ImportError:
from threading import Thread
from threading import Event
import json
+import uuid
import proton
from proton import Message, Timeout
@@ -720,11 +721,12 @@ class AsyncTestReceiver(MessagingHandler):
messages. Messages can be retrieved from this thread via the queue member.
:param wait: block the constructor until the link has been fully
established.
+ :param recover_link: restart on remote link detach
"""
Empty = Queue.Empty
def __init__(self, address, source, conn_args=None, container_id=None,
- wait=True):
+ wait=True, recover_link=False):
super(AsyncTestReceiver, self).__init__()
self.address = address
self.source = source
@@ -732,9 +734,11 @@ class AsyncTestReceiver(MessagingHandler):
self.queue = Queue.Queue()
self._conn = None
self._container = Container(self)
- if container_id is not None:
- self._container.container_id = container_id
+ cid = container_id or "ATR-%s:%s" % (source, uuid.uuid4())
+ self._container.container_id = cid
self._ready = Event()
+ self._recover_link = recover_link
+ self._recover_count = 0
self._stop_thread = False
self._thread = Thread(target=self._main)
self._thread.daemon = True
@@ -770,6 +774,17 @@ class AsyncTestReceiver(MessagingHandler):
def on_link_opened(self, event):
self._ready.set()
+ def on_link_closing(self, event):
+ event.link.close()
+ if self._recover_link and not self._stop_thread:
+ # lesson learned: the generated link name will be the same as the
+ # old link (which is bad) so we specify a new one
+ self._recover_count += 1
+ kwargs = {'source': self.source,
+ 'name': "%s:%s" % (event.link.name, self._recover_count)}
+ rcv = event.container.create_receiver(event.connection,
+ **kwargs)
+
def on_message(self, event):
self.queue.put(event.message)
@@ -787,8 +802,8 @@ class AsyncTestSender(MessagingHandler):
self._unaccepted = count
self._body = body or "test"
self._container = Container(self)
- if container_id is not None:
- self._container.container_id = container_id
+ cid = container_id or "ATS-%s:%s" % (target, uuid.uuid4())
+ self._container.container_id = cid
self._thread = Thread(target=self._main)
self._thread.daemon = True
self._thread.start()
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 3e2ead5..f9a55d9 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -34,7 +34,6 @@ from system_test import AsyncTestSender
from system_test import QdManager
from system_tests_link_routes import ConnLinkRouteService
from test_broker import FakeService
-from test_broker import FakeBrokerStop
from proton.handlers import MessagingHandler
from proton.reactor import Container, DynamicNodeProperties
from proton.utils import BlockingConnection
@@ -723,16 +722,19 @@ class LinkRouteProxyTest(TestCase):
cls.CONNECTOR_TYPE = 'org.apache.qpid.dispatch.connector'
def _get_address(self, router, address):
+ """Lookup address in route table"""
a_type = 'org.apache.qpid.dispatch.router.address'
addrs = router.management.query(a_type).get_dicts()
return list(filter(lambda a: a['name'].find(address) != -1,
addrs))
def _wait_address_gone(self, router, address):
+ """Block until address is removed from the route table"""
while self._get_address(router, address):
sleep(0.1)
def _test_traffic(self, sender, receiver, address, count=5):
+ """Generate message traffic between two normal clients"""
tr = AsyncTestReceiver(receiver, address)
ts = AsyncTestSender(sender, address, count)
ts.wait() # wait until all sent
@@ -740,6 +742,91 @@ class LinkRouteProxyTest(TestCase):
tr.queue.get(timeout=TIMEOUT)
tr.stop()
+ def _test_attach_weirdness(self, service):
+ """Exercise a service that simulates link attach failures"""
+
+ # create a consumer, do not wait for link to open, reattach
+ # on received detach
+ rx = AsyncTestReceiver(self.EB1.listener, 'CfgLinkRoute1/foo',
+ wait=False, recover_link=True)
+ service.link_dropped.wait(timeout=TIMEOUT)
+ service.join() # wait for thread exit
+ del service
+
+ # now attach a working service to the same address,
+ # make sure it all works
+ fs = FakeService(self.EA1.route_container)
+ self.INT_B.wait_address("CfgLinkRoute1")
+
+ if True: #
+ rx.stop()
+ fs.join()
+ else:
+ tx = AsyncTestSender(self.EA1.listener, 'CfgLinkRoute1/foo',
+
+ body="HEY HO LET'S GO!")
+ tx.wait()
+
+ msg = rx.queue.get(timeout=TIMEOUT)
+ self.assertTrue(msg.body == "HEY HO LET'S GO!")
+ rx.stop()
+ fs.join()
+ self.assertEqual(1, fs.in_count)
+ self.assertEqual(1, fs.out_count)
+
+ def test_01_immedate_detach_reattach(self):
+ """
+ Have a service for a link routed address abruptly detach
+ in response to an incoming link attach
+
+ The attaching client from EB1 will get an attach response then an
+ immediate detach. The client will immediately re-establish the link.
+ """
+ class AttachDropper(FakeService):
+ def __init__(self, *args, **kwargs):
+ super(AttachDropper, self).__init__(*args, **kwargs)
+ self.link_dropped = Event()
+
+ def on_link_remote_open(self, event):
+ # drop it
+ event.link.close()
+ event.connection.close()
+ self.link_dropped.set()
+
+ ad = AttachDropper(self.EA1.route_container)
+ self.INT_B.wait_address("CfgLinkRoute1")
+ self._test_attach_weirdness(ad)
+
+ def test_02_thrashing_link_routes(self):
+ """
+ Rapidly add and delete link routes at the edge
+ """
+
+ # activate the pre-configured link routes
+ ea1_mgmt = self.EA1.management
+ fs = FakeService(self.EA1.route_container)
+ self.INT_B.wait_address("CfgLinkRoute1")
+
+ for i in range(10):
+ lr1 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE,
+ name="TestLRout%d" % i,
+ attributes={'pattern': 'Test/*/%d/#' % i,
+ 'containerId': 'FakeBroker',
+ 'direction': 'out'})
+ lr2 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE,
+ name="TestLRin%d" % i,
+ attributes={'pattern': 'Test/*/%d/#' % i,
+ 'containerId': 'FakeBroker',
+ 'direction': 'in'})
+ # verify that they are correctly propagated (once)
+ if i == 9:
+ self.INT_B.wait_address("Test/*/9/#")
+ lr1.delete()
+ lr2.delete()
+
+ fs.join()
+ self._wait_address_gone(self.INT_B, "CfgLinkRoute1")
+
def _validate_topology(self, router, expected_links, address):
"""
query existing links and verify they are set up as expected
@@ -765,7 +852,59 @@ class LinkRouteProxyTest(TestCase):
test_links)
self.assertTrue(len(matches) == 1)
- def test_link_topology(self):
+ def test_03_interior_conn_lost(self):
+ """
+ What happens when the interior connection bounces?
+ """
+ config = Qdrouterd.Config([('router', {'mode': 'edge',
+ 'id': 'Edge1'}),
+ ('listener', {'role': 'normal',
+ 'port': self.tester.get_port()}),
+ ('listener', {'name': 'rc',
+ 'role': 'route-container',
+ 'port': self.tester.get_port()}),
+ ('linkRoute', {'pattern': 'Edge1/*',
+ 'containerId': 'FakeBroker',
+ 'direction': 'in'}),
+ ('linkRoute', {'pattern': 'Edge1/*',
+ 'containerId': 'FakeBroker',
+ 'direction': 'out'})])
+ er = self.tester.qdrouterd('Edge1', config, wait=True)
+
+ # activate the link routes before the connection exists
+ fs = FakeService(er.addresses[1])
+ er.wait_address("Edge1/*")
+
+ # create the connection to interior
+ er_mgmt = er.management
+ ctor = er_mgmt.create(type=self.CONNECTOR_TYPE,
+ name='toA',
+ attributes={'role': 'edge',
+ 'port': self.INTA_edge_port})
+ self.INT_B.wait_address("Edge1/*")
+
+ # delete it, and verify the routes are removed
+ ctor.delete()
+ self._wait_address_gone(self.INT_B, "Edge1/*")
+
+ # now recreate and verify routes re-appear
+ ctor = er_mgmt.create(type=self.CONNECTOR_TYPE,
+ name='toA',
+ attributes={'role': 'edge',
+ 'port': self.INTA_edge_port})
+ self.INT_B.wait_address("Edge1/*")
+ self._test_traffic(self.INT_B.listener,
+ self.INT_B.listener,
+ "Edge1/One",
+ count=5)
+ fs.join()
+ self.assertEqual(5, fs.in_count)
+ self.assertEqual(5, fs.out_count)
+
+ er.teardown()
+ self._wait_address_gone(self.INT_B, "Edge1/*")
+
+ def test_50_link_topology(self):
"""
Verify that the link topology that results from activating a link route
and sending traffic is correct
@@ -825,7 +964,7 @@ class LinkRouteProxyTest(TestCase):
self.assertEqual(1, fs.in_count)
self.assertEqual(1, fs.out_count)
- def test_link_route_proxy_configured(self):
+ def test_51_link_route_proxy_configured(self):
"""
Activate the configured link routes via a FakeService, verify proxies
created by passing traffic from/to and interior router
@@ -834,10 +973,12 @@ class LinkRouteProxyTest(TestCase):
fs = FakeService(self.EA1.route_container)
self.INT_B.wait_address("CfgLinkRoute1")
+
self._test_traffic(self.INT_B.listener,
self.INT_B.listener,
"CfgLinkRoute1/hi",
count=5)
+
fs.join()
self.assertEqual(5, fs.in_count)
self.assertEqual(5, fs.out_count)
@@ -850,16 +991,19 @@ class LinkRouteProxyTest(TestCase):
fs = FakeService(self.EB1.route_container)
self.INT_A.wait_address("*.cfg.pattern.#")
+
self._test_traffic(self.INT_A.listener,
self.INT_A.listener,
"MATCH.cfg.pattern",
count=5)
+
fs.join()
self.assertEqual(5, fs.in_count)
self.assertEqual(5, fs.out_count)
self._wait_address_gone(self.INT_A, "*.cfg.pattern.#")
- def test_conn_link_route_proxy(self):
+
+ def test_52_conn_link_route_proxy(self):
"""
Test connection scoped link routes by connecting a fake service to the
Edge via the route-container connection. Have the fake service
@@ -896,88 +1040,6 @@ class LinkRouteProxyTest(TestCase):
self._wait_address_gone(self.INT_A, "Conn/*/One")
- def test_interior_conn_lost(self):
- """
- What happens when the interior connection bounces?
- """
- config = Qdrouterd.Config([('router', {'mode': 'edge',
- 'id': 'Edge1'}),
- ('listener', {'role': 'normal',
- 'port': self.tester.get_port()}),
- ('listener', {'name': 'rc',
- 'role': 'route-container',
- 'port': self.tester.get_port()}),
- ('linkRoute', {'pattern': 'Edge1/*',
- 'containerId': 'FakeBroker',
- 'direction': 'in'}),
- ('linkRoute', {'pattern': 'Edge1/*',
- 'containerId': 'FakeBroker',
- 'direction': 'out'})])
- er = self.tester.qdrouterd('Edge1', config, wait=True)
-
- # activate the link routes before the connection exists
- fs = FakeService(er.addresses[1])
- er.wait_address("Edge1/*")
-
- # create the connection to interior
- er_mgmt = er.management
- ctor = er_mgmt.create(type=self.CONNECTOR_TYPE,
- name='toA',
- attributes={'role': 'edge',
- 'port': self.INTA_edge_port})
- self.INT_B.wait_address("Edge1/*")
-
- # delete it, and verify the routes are removed
- ctor.delete()
- self._wait_address_gone(self.INT_B, "Edge1/*")
-
- # now recreate and verify routes re-appear
- ctor = er_mgmt.create(type=self.CONNECTOR_TYPE,
- name='toA',
- attributes={'role': 'edge',
- 'port': self.INTA_edge_port})
- self.INT_B.wait_address("Edge1/*")
- self._test_traffic(self.INT_B.listener,
- self.INT_B.listener,
- "Edge1/One",
- count=5)
- fs.join()
- self.assertEqual(5, fs.in_count)
- self.assertEqual(5, fs.out_count)
-
- er.teardown()
- self._wait_address_gone(self.INT_B, "Edge1/*")
-
- def test_thrashing_link_routes(self):
- """
- Rapidly add and delete link routes at the edge
- """
-
- # activate the pre-configured link routes
- ea1_mgmt = self.EA1.management
- fs = FakeService(self.EA1.route_container)
- self.INT_B.wait_address("CfgLinkRoute1")
-
- for i in range(10):
- lr1 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE,
- name="TestLRout%d" % i,
- attributes={'pattern': 'Test/*/%d/#' % i,
- 'containerId': 'FakeBroker',
- 'direction': 'out'})
- lr2 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE,
- name="TestLRin%d" % i,
- attributes={'pattern': 'Test/*/%d/#' % i,
- 'containerId': 'FakeBroker',
- 'direction': 'in'})
- # verify that they are correctly propagated (once)
- if i == 9:
- self.INT_B.wait_address("Test/*/9/#")
- lr1.delete()
- lr2.delete()
-
- fs.join()
- self._wait_address_gone(self.INT_B, "CfgLinkRoute1")
-
class Timeout(object):
def __init__(self, parent):
diff --git a/tests/test_broker.py b/tests/test_broker.py
index a05df02..df60626 100644
--- a/tests/test_broker.py
+++ b/tests/test_broker.py
@@ -39,11 +39,6 @@ from proton.reactor import AtMostOnce
from system_test import TIMEOUT
-class FakeBrokerStop(Exception):
- """stop the broker from a handler callback"""
- pass
-
-
class FakeBroker(MessagingHandler):
"""
A fake broker-like service that listens for client connections
@@ -110,22 +105,17 @@ class FakeBroker(MessagingHandler):
self._container.timeout = 1.0
self._container.start()
- try:
- while self._container.process():
- if self._stop_thread:
- break
-
- if self.acceptor:
- self.acceptor.close()
- self.acceptor = None
- for c in self._connections:
- c.close()
- self._connections = []
- self._container.process()
- except FakeBrokerStop:
- # this abruptly kills the broker useful to test how dispatch deals
- # with hung/stopped containers
- pass
+ while self._container.process():
+ if self._stop_thread:
+ break
+
+ if self.acceptor:
+ self.acceptor.close()
+ self.acceptor = None
+ for c in self._connections:
+ c.close()
+ self._connections = []
+ self._container.process()
def join(self):
self._stop_thread = True
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org