You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/12/12 16:47:59 UTC
qpid-dispatch git commit: NO-JIRA: add more tests for the edge router
Repository: qpid-dispatch
Updated Branches:
refs/heads/master fafd1d618 -> 15b70c433
NO-JIRA: add more tests for the edge router
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/15b70c43
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/15b70c43
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/15b70c43
Branch: refs/heads/master
Commit: 15b70c43379c4fe730922d10425ef96d76ce9eeb
Parents: fafd1d6
Author: Kenneth Giusti <kg...@apache.org>
Authored: Thu Dec 6 15:37:23 2018 -0500
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Wed Dec 12 11:44:14 2018 -0500
----------------------------------------------------------------------
tests/system_test.py | 9 ++--
tests/system_tests_edge_router.py | 93 +++++++++++++++++++++++++++++++++-
tests/test_broker.py | 30 ++++++++---
3 files changed, 119 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index ef15959..8820cc2 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -717,11 +717,14 @@ def main_module():
class AsyncTestReceiver(MessagingHandler):
"""
A simple receiver that runs in the background and queues any received
- messages. Messages can be retrieved from this thread via the queue member
+ messages. Messages can be retrieved from this thread via the queue member.
+ :param wait: block the constructor until the link has been fully
+ established.
"""
Empty = Queue.Empty
- def __init__(self, address, source, conn_args=None, container_id=None):
+ def __init__(self, address, source, conn_args=None, container_id=None,
+ wait=True):
super(AsyncTestReceiver, self).__init__()
self.address = address
self.source = source
@@ -736,7 +739,7 @@ class AsyncTestReceiver(MessagingHandler):
self._thread = Thread(target=self._main)
self._thread.daemon = True
self._thread.start()
- if self._ready.wait(timeout=TIMEOUT) is False:
+ if wait and self._ready.wait(timeout=TIMEOUT) is False:
raise Exception("Timed out waiting for receiver start")
def _main(self):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index b84e214..3e2ead5 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -23,6 +23,7 @@ from __future__ import absolute_import
from __future__ import print_function
from time import sleep
+from threading import Event
from threading import Timer
import unittest2 as unittest
@@ -30,10 +31,13 @@ from proton import Message, Timeout
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
from system_test import AsyncTestReceiver
from system_test import AsyncTestSender
+from system_test import QdManager
from system_tests_link_routes import ConnLinkRouteService
from test_broker import FakeService
+from test_broker import FakeBrokerStop
from proton.handlers import MessagingHandler
from proton.reactor import Container, DynamicNodeProperties
+from proton.utils import BlockingConnection
from qpid_dispatch.management.client import Node
from subprocess import PIPE, STDOUT
import re
@@ -736,6 +740,91 @@ class LinkRouteProxyTest(TestCase):
tr.queue.get(timeout=TIMEOUT)
tr.stop()
+ def _validate_topology(self, router, expected_links, address):
+ """
+ query existing links and verify they are set up as expected
+ """
+ mgmt = QdManager(self, address=router)
+ # fetch all the connections
+ cl = mgmt.query('org.apache.qpid.dispatch.connection')
+ # map them by their identity
+ conns = dict([(c['identity'], c) for c in cl])
+
+ # now fetch all links for the address
+ ll = mgmt.query('org.apache.qpid.dispatch.router.link')
+ test_links = [l for l in ll if
+ l.get('owningAddr', '').find(address) != -1]
+ self.assertEqual(len(expected_links), len(test_links))
+
+ for elink in expected_links:
+ matches = filter(lambda l: (l['linkDir'] == elink[0]
+ and
+ conns[l['connectionId']]['container'] == elink[1]
+ and
+ conns[l['connectionId']]['role'] == elink[2]),
+ test_links)
+ self.assertTrue(len(matches) == 1)
+
+ def test_link_topology(self):
+ """
+ Verify that the link topology that results from activating a link route
+ and sending traffic is correct
+ """
+ fs = FakeService(self.EA1.route_container)
+ self.INT_B.wait_address("CfgLinkRoute1")
+
+ # create a sender on one edge and the receiver on another
+ bc_b = BlockingConnection(self.EB1.listener, timeout=TIMEOUT)
+ erx = bc_b.create_receiver(address="CfgLinkRoute1/buhbye", credit=10)
+ bc_a = BlockingConnection(self.EA1.listener, timeout=TIMEOUT)
+ etx = bc_a.create_sender(address="CfgLinkRoute1/buhbye")
+
+ etx.send(Message(body="HI THERE"), timeout=TIMEOUT)
+ self.assertEqual("HI THERE", erx.receive(timeout=TIMEOUT).body)
+ erx.accept()
+
+ # expect the following links have been established for the
+ # "CfgLinkRoute1/buhbye" address:
+
+ # EA1
+ # 1 out link to INT.A (connection role: edge)
+ # 1 in link from bc_a (normal)
+ # 1 in link from FakeBroker (route-container)
+ # 1 out link to FakeBroker (route-container)
+ # INT.A
+ # 1 in link from EA1 (edge)
+ # 1 out link to INT.B (inter-router)
+ # INT.B
+ # 1 out link to EB1 (edge)
+ # 1 in link from INT.A (inter-router)
+ # EB1
+ # 1 out link to bc_b (normal)
+ # 1 in link from INT.B (edge)
+
+ expect = {
+ self.EA1.listener: [
+ ('in', bc_a.container.container_id, 'normal'),
+ ('in', 'FakeBroker', 'route-container'),
+ ('out', 'FakeBroker', 'route-container'),
+ ('out', 'INT.A', 'edge')],
+ self.INT_A.listener: [
+ ('in', 'EA1', 'edge'),
+ ('out', 'INT.B', 'inter-router')],
+ self.INT_B.listener: [
+ ('in', 'INT.A', 'inter-router'),
+ ('out', 'EB1', 'edge')],
+ self.EB1.listener: [
+ ('in', 'INT.B', 'edge'),
+ ('out', bc_b.container.container_id, 'normal')]
+ }
+ for router, expected_links in expect.items():
+ self._validate_topology(router, expected_links,
+ 'CfgLinkRoute1/buhbye')
+
+ fs.join()
+ self.assertEqual(1, fs.in_count)
+ self.assertEqual(1, fs.out_count)
+
def test_link_route_proxy_configured(self):
"""
Activate the configured link routes via a FakeService, verify proxies
@@ -777,7 +866,7 @@ class LinkRouteProxyTest(TestCase):
configured some link routes. Then have clients on the interior
exchange messages via the fake service.
"""
- fs = ConnLinkRouteService(self.EA1.addresses[1],
+ fs = ConnLinkRouteService(self.EA1.route_container,
container_id="FakeService",
config = [("ConnLinkRoute1",
{"pattern": "Conn/*/One",
@@ -866,7 +955,7 @@ class LinkRouteProxyTest(TestCase):
# activate the pre-configured link routes
ea1_mgmt = self.EA1.management
- fs = FakeService(self.EA1.addresses[1])
+ fs = FakeService(self.EA1.route_container)
self.INT_B.wait_address("CfgLinkRoute1")
for i in range(10):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/test_broker.py
----------------------------------------------------------------------
diff --git a/tests/test_broker.py b/tests/test_broker.py
index 0b9bdfa..a05df02 100644
--- a/tests/test_broker.py
+++ b/tests/test_broker.py
@@ -39,6 +39,11 @@ from proton.reactor import AtMostOnce
from system_test import TIMEOUT
+class FakeBrokerStop(Exception):
+ """stop the broker from a handler callback"""
+ pass
+
+
class FakeBroker(MessagingHandler):
"""
A fake broker-like service that listens for client connections
@@ -104,14 +109,23 @@ class FakeBroker(MessagingHandler):
def _main(self):
self._container.timeout = 1.0
self._container.start()
- while self._container.process():
- if self._stop_thread:
- if self.acceptor:
- self.acceptor.close()
- self.acceptor = None
- for c in self._connections:
- c.close()
- self._connections = []
+
+ try:
+ while self._container.process():
+ if self._stop_thread:
+ break
+
+ if self.acceptor:
+ self.acceptor.close()
+ self.acceptor = None
+ for c in self._connections:
+ c.close()
+ self._connections = []
+ self._container.process()
+ except FakeBrokerStop:
+ # this abruptly kills the broker useful to test how dispatch deals
+ # with hung/stopped containers
+ pass
def join(self):
self._stop_thread = True
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org