You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2016/04/15 16:05:11 UTC
qpid-dispatch git commit: DISPATCH-237 - Added unit test to make sure
that the delivery tags are preserved during link routing
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 7978579c8 -> e43f475f4
DISPATCH-237 - Added unit test to make sure that the delivery tags are preserved during link routing
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e43f475f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e43f475f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e43f475f
Branch: refs/heads/master
Commit: e43f475f4b2cceea9b43a752c53e7a7937534f27
Parents: 7978579
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Fri Apr 15 10:04:52 2016 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Fri Apr 15 10:04:52 2016 -0400
----------------------------------------------------------------------
tests/system_tests_link_routes.py | 127 +++++++++++++++++++++++++++++----
1 file changed, 112 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e43f475f/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index d7f673a..e0f9e1c 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -23,8 +23,9 @@ from subprocess import PIPE, STDOUT
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
-from proton import Message
-from proton.reactor import AtMostOnce
+from proton import Message, Endpoint
+from proton.handlers import MessagingHandler
+from proton.reactor import AtMostOnce, Container
from proton.utils import BlockingConnection, LinkDetached
from qpid_dispatch.management.client import Node
@@ -75,6 +76,7 @@ class LinkRoutePatternTest(TestCase):
a_listener_port = cls.tester.get_port()
b_listener_port = cls.tester.get_port()
c_listener_port = cls.tester.get_port()
+ test_tag_listener_port = cls.tester.get_port()
router('A',
[
@@ -83,12 +85,16 @@ class LinkRoutePatternTest(TestCase):
router('B',
[
('listener', {'role': 'normal', 'addr': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+ ('listener', {'name': 'test-tag', 'role': 'route-container', 'addr': '0.0.0.0', 'port': test_tag_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+
# This is an on-demand connection made from QDR.B's ephemeral port to a_listener_port
- ('connector', {'name': 'broker', 'role': 'on-demand', 'addr': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+ ('connector', {'name': 'broker', 'role': 'route-container', 'addr': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
# Only inter router communication must happen on 'inter-router' connectors. This connector makes
# a connection from the router B's ephemeral port to c_listener_port
('connector', {'role': 'inter-router', 'addr': '0.0.0.0', 'port': c_listener_port}),
- ('linkRoutePattern', {'prefix': 'org.apache', 'connector': 'broker'})
+ ('linkRoutePattern', {'prefix': 'org.apache', 'connector': 'broker'}),
+ ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'dir': 'in'}),
+ ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'dir': 'out'})
]
)
router('C',
@@ -98,7 +104,9 @@ class LinkRoutePatternTest(TestCase):
('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}),
# Note here that the linkRoutePattern is set to org.apache. which makes it backward compatible.
# The dot(.) at the end is ignored by the address hashing scheme.
- ('linkRoutePattern', {'prefix': 'org.apache.'})
+ ('linkRoutePattern', {'prefix': 'org.apache.'}),
+ ('linkRoute', {'prefix': 'pulp.task', 'dir': 'in'}),
+ ('linkRoute', {'prefix': 'pulp.task', 'dir': 'out'})
]
)
@@ -159,8 +167,8 @@ class LinkRoutePatternTest(TestCase):
"""
out = self.run_qdstat_linkRoute(self.routers[1].addresses[0])
out_list = out.split()
- self.assertEqual(out_list.count('in'), 1)
- self.assertEqual(out_list.count('out'), 1)
+ self.assertEqual(out_list.count('in'), 2)
+ self.assertEqual(out_list.count('out'), 2)
def test_ccc_qdstat_link_routes_routerC(self):
"""
@@ -170,8 +178,8 @@ class LinkRoutePatternTest(TestCase):
out = self.run_qdstat_linkRoute(self.routers[2].addresses[1])
out_list = out.split()
- self.assertEqual(out_list.count('in'), 1)
- self.assertEqual(out_list.count('out'), 1)
+ self.assertEqual(out_list.count('in'), 2)
+ self.assertEqual(out_list.count('out'), 2)
def test_ddd_partial_link_route_match(self):
"""
@@ -223,7 +231,6 @@ class LinkRoutePatternTest(TestCase):
# self.assertEqual(4, len()
self.assertEquals(4, len(local_node.query(type='org.apache.qpid.dispatch.router.link').results))
- #blocking_receiver.close()
blocking_connection.close()
def test_partial_link_route_match_1(self):
@@ -263,7 +270,6 @@ class LinkRoutePatternTest(TestCase):
name='M0org.apache.dev').deliveriesIngress,
"deliveriesIngress is wrong")
- #blocking_receiver.close()
blocking_connection.close()
def test_full_link_route_match(self):
@@ -307,7 +313,6 @@ class LinkRoutePatternTest(TestCase):
name='M0org.apache').deliveriesIngress,
"deliveriesIngress is wrong")
- #blocking_receiver.close()
blocking_connection.close()
def test_full_link_route_match_1(self):
@@ -327,6 +332,7 @@ class LinkRoutePatternTest(TestCase):
# Sender to to org.apache
blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options)
+
msg = Message(body=hello_world_4)
# Send a message
blocking_sender.send(msg)
@@ -347,7 +353,6 @@ class LinkRoutePatternTest(TestCase):
name='M0org.apache').deliveriesIngress,
"deliveriesIngress is wrong")
- #blocking_receiver.close()
blocking_connection.close()
def test_zzz_qdmanage_delete_link_route(self):
@@ -361,6 +366,8 @@ class LinkRoutePatternTest(TestCase):
identity_1 = result_list[0][1]
identity_2 = result_list[1][1]
+ identity_3 = result_list[2][1]
+ identity_4 = result_list[3][1]
cmd = 'DELETE --type=linkRoute --identity=' + identity_1
self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
@@ -368,16 +375,21 @@ class LinkRoutePatternTest(TestCase):
cmd = 'DELETE --type=linkRoute --identity=' + identity_2
self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
+ cmd = 'DELETE --type=linkRoute --identity=' + identity_3
+ self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
+
+ cmd = 'DELETE --type=linkRoute --identity=' + identity_4
+ self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
+
cmd = 'QUERY --type=linkRoute'
out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
self.assertEquals(out.rstrip(), '[]')
- sleep(1)
-
# linkRoutes now gone on QDR.B but remember that it still exist on QDR.C
# We will now try to create a receiver on address org.apache.dev on QDR.C.
# Since the linkRoute on QDR.B is gone, QDR.C
# will not allow a receiver to be created since there is no route to destination.
+
# Connects to listener #2 on QDR.C
addr = self.routers[2].addresses[1]
@@ -398,6 +410,8 @@ class LinkRoutePatternTest(TestCase):
identity_1 = result_list[0][1]
identity_2 = result_list[1][1]
+ identity_3 = result_list[2][1]
+ identity_4 = result_list[3][1]
cmd = 'DELETE --type=linkRoute --identity=' + identity_1
self.run_qdmanage(cmd=cmd, address=addr)
@@ -405,6 +419,12 @@ class LinkRoutePatternTest(TestCase):
cmd = 'DELETE --type=linkRoute --identity=' + identity_2
self.run_qdmanage(cmd=cmd, address=addr)
+ cmd = 'DELETE --type=linkRoute --identity=' + identity_3
+ self.run_qdmanage(cmd=cmd, address=addr)
+
+ cmd = 'DELETE --type=linkRoute --identity=' + identity_4
+ self.run_qdmanage(cmd=cmd, address=addr)
+
cmd = 'QUERY --type=linkRoute'
out = self.run_qdmanage(cmd=cmd, address=addr)
self.assertEquals(out.rstrip(), '[]')
@@ -436,5 +456,82 @@ class LinkRoutePatternTest(TestCase):
name='M0org.apache.dev').deliveriesEgress,
"deliveriesEgress is wrong")
+ def test_yyy_delivery_tag(self):
+ """
+ Tests that the router carries over the delivery tag on a link routed delivery
+ """
+ listening_address = self.routers[1].addresses[1]
+ sender_address = self.routers[2].addresses[1]
+ qdstat_address = self.routers[2].addresses[1]
+ test = DeliveryTagsTest(sender_address, listening_address, qdstat_address)
+ test.run()
+ self.assertTrue(test.message_received)
+ self.assertTrue(test.delivery_tag_verified)
+
+class DeliveryTagsTest(MessagingHandler):
+ def __init__(self, sender_address, listening_address, qdstat_address):
+ super(DeliveryTagsTest, self).__init__()
+ self.sender_address = sender_address
+ self.listening_address = listening_address
+ self.sender = None
+ self.message_received = False
+ self.receiver_connection = None
+ self.sender_connection = None
+ self.qdstat_address = qdstat_address
+ self.id = '1235'
+ self.times = 1
+ self.delivery_tag_verified = False
+ # The delivery tag we are going to send in the transfer frame
+ # We will later make sure that the same delivery tag shows up on the receiving end in the link routed case.
+ self.delivery_tag = '92319'
+
+ def on_start(self, event):
+ self.receiver_connection = event.container.connect(self.listening_address)
+
+ def on_connection_remote_open(self, event):
+ if event.connection == self.receiver_connection:
+ continue_loop = True
+ # Dont open the sender connection unless we can make sure that there is a remote receiver ready to
+ # accept the message.
+ # If there is no remote receiver, the router will throw a 'No route to destination' error when
+ # creating sender connection.
+ # The following loops introduces a wait before creating the sender connection. It gives time to the
+ # router so that the address Dpulp.task can show up on the remoteCount
+ i = 0
+ while continue_loop:
+ if i > 100: # If we have run the read command for more than hundred times and we still do not have
+ # the remoteCount set to 1, there is a problem, just exit out of the function instead
+ # of looping to infinity.
+ self.receiver_connection.close()
+ return
+ local_node = Node.connect(self.qdstat_address, timeout=TIMEOUT)
+ out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount
+ if out == 1:
+ continue_loop = False
+ i+=1
+
+ self.sender_connection = event.container.connect(self.sender_address)
+ self.sender = event.container.create_sender(self.sender_connection, "pulp.task", options=AtMostOnce())
+
+ def on_sendable(self, event):
+ if self.times == 1:
+ msg = Message(body="Hello World")
+ self.sender.send(msg, tag=self.delivery_tag)
+ self.sender_connection.close()
+ self.times +=1
+
+ def on_message(self, event):
+ if "Hello World" == event.message.body:
+ self.message_received = True
+
+ # If the tag on the delivery is the same as the tag we sent with the initial transfer, it means
+ # that the router has propagated the delivery tag successfully because of link routing.
+ if self.delivery_tag == event.delivery.tag:
+ self.delivery_tag_verified = True
+ self.receiver_connection.close()
+
+ def run(self):
+ Container(self).run()
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org