You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2016/04/15 16:05:11 UTC

qpid-dispatch git commit: DISPATCH-237 - Added unit test to make sure that the delivery tags are preserved during link routing

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 7978579c8 -> e43f475f4


DISPATCH-237 - Added unit test to make sure that the delivery tags are preserved during link routing


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e43f475f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e43f475f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e43f475f

Branch: refs/heads/master
Commit: e43f475f4b2cceea9b43a752c53e7a7937534f27
Parents: 7978579
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Fri Apr 15 10:04:52 2016 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Fri Apr 15 10:04:52 2016 -0400

----------------------------------------------------------------------
 tests/system_tests_link_routes.py | 127 +++++++++++++++++++++++++++++----
 1 file changed, 112 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e43f475f/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index d7f673a..e0f9e1c 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -23,8 +23,9 @@ from subprocess import PIPE, STDOUT
 
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
 
-from proton import Message
-from proton.reactor import AtMostOnce
+from proton import Message, Endpoint
+from proton.handlers import MessagingHandler
+from proton.reactor import AtMostOnce, Container
 from proton.utils import BlockingConnection, LinkDetached
 
 from qpid_dispatch.management.client import Node
@@ -75,6 +76,7 @@ class LinkRoutePatternTest(TestCase):
         a_listener_port = cls.tester.get_port()
         b_listener_port = cls.tester.get_port()
         c_listener_port = cls.tester.get_port()
+        test_tag_listener_port = cls.tester.get_port()
 
         router('A',
                [
@@ -83,12 +85,16 @@ class LinkRoutePatternTest(TestCase):
         router('B',
                [
                    ('listener', {'role': 'normal', 'addr': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+                   ('listener', {'name': 'test-tag', 'role': 'route-container', 'addr': '0.0.0.0', 'port': test_tag_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+
                    # This is an on-demand connection made from QDR.B's ephemeral port to a_listener_port
-                   ('connector', {'name': 'broker', 'role': 'on-demand', 'addr': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+                   ('connector', {'name': 'broker', 'role': 'route-container', 'addr': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
                    # Only inter router communication must happen on 'inter-router' connectors. This connector makes
                    # a connection from the router B's ephemeral port to c_listener_port
                    ('connector', {'role': 'inter-router', 'addr': '0.0.0.0', 'port': c_listener_port}),
-                   ('linkRoutePattern', {'prefix': 'org.apache', 'connector': 'broker'})
+                   ('linkRoutePattern', {'prefix': 'org.apache', 'connector': 'broker'}),
+                   ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'dir': 'in'}),
+                   ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'dir': 'out'})
                 ]
                )
         router('C',
@@ -98,7 +104,9 @@ class LinkRoutePatternTest(TestCase):
                    ('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}),
                    # Note here that the linkRoutePattern is set to org.apache. which makes it backward compatible.
                    # The dot(.) at the end is ignored by the address hashing scheme.
-                   ('linkRoutePattern', {'prefix': 'org.apache.'})
+                   ('linkRoutePattern', {'prefix': 'org.apache.'}),
+                   ('linkRoute', {'prefix': 'pulp.task', 'dir': 'in'}),
+                   ('linkRoute', {'prefix': 'pulp.task', 'dir': 'out'})
                 ]
                )
 
@@ -159,8 +167,8 @@ class LinkRoutePatternTest(TestCase):
         """
         out = self.run_qdstat_linkRoute(self.routers[1].addresses[0])
         out_list = out.split()
-        self.assertEqual(out_list.count('in'), 1)
-        self.assertEqual(out_list.count('out'), 1)
+        self.assertEqual(out_list.count('in'), 2)
+        self.assertEqual(out_list.count('out'), 2)
 
     def test_ccc_qdstat_link_routes_routerC(self):
         """
@@ -170,8 +178,8 @@ class LinkRoutePatternTest(TestCase):
         out = self.run_qdstat_linkRoute(self.routers[2].addresses[1])
         out_list = out.split()
 
-        self.assertEqual(out_list.count('in'), 1)
-        self.assertEqual(out_list.count('out'), 1)
+        self.assertEqual(out_list.count('in'), 2)
+        self.assertEqual(out_list.count('out'), 2)
 
     def test_ddd_partial_link_route_match(self):
         """
@@ -223,7 +231,6 @@ class LinkRoutePatternTest(TestCase):
         # self.assertEqual(4, len()
         self.assertEquals(4, len(local_node.query(type='org.apache.qpid.dispatch.router.link').results))
 
-        #blocking_receiver.close()
         blocking_connection.close()
 
     def test_partial_link_route_match_1(self):
@@ -263,7 +270,6 @@ class LinkRoutePatternTest(TestCase):
                                             name='M0org.apache.dev').deliveriesIngress,
                          "deliveriesIngress is wrong")
 
-        #blocking_receiver.close()
         blocking_connection.close()
 
     def test_full_link_route_match(self):
@@ -307,7 +313,6 @@ class LinkRoutePatternTest(TestCase):
                                             name='M0org.apache').deliveriesIngress,
                          "deliveriesIngress is wrong")
 
-        #blocking_receiver.close()
         blocking_connection.close()
 
     def test_full_link_route_match_1(self):
@@ -327,6 +332,7 @@ class LinkRoutePatternTest(TestCase):
 
         # Sender to  to org.apache
         blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options)
+
         msg = Message(body=hello_world_4)
         # Send a message
         blocking_sender.send(msg)
@@ -347,7 +353,6 @@ class LinkRoutePatternTest(TestCase):
                                             name='M0org.apache').deliveriesIngress,
                          "deliveriesIngress is wrong")
 
-        #blocking_receiver.close()
         blocking_connection.close()
 
     def test_zzz_qdmanage_delete_link_route(self):
@@ -361,6 +366,8 @@ class LinkRoutePatternTest(TestCase):
 
         identity_1 = result_list[0][1]
         identity_2 = result_list[1][1]
+        identity_3 = result_list[2][1]
+        identity_4 = result_list[3][1]
 
         cmd = 'DELETE --type=linkRoute --identity=' + identity_1
         self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
@@ -368,16 +375,21 @@ class LinkRoutePatternTest(TestCase):
         cmd = 'DELETE --type=linkRoute --identity=' + identity_2
         self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
 
+        cmd = 'DELETE --type=linkRoute --identity=' + identity_3
+        self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
+
+        cmd = 'DELETE --type=linkRoute --identity=' + identity_4
+        self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
+
         cmd = 'QUERY --type=linkRoute'
         out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0])
         self.assertEquals(out.rstrip(), '[]')
 
-        sleep(1)
-
         # linkRoutes now gone on QDR.B but remember that it still exist on QDR.C
         # We will now try to create a receiver on address org.apache.dev on QDR.C.
         # Since the linkRoute on QDR.B is gone, QDR.C
         # will not allow a receiver to be created since there is no route to destination.
+
         # Connects to listener #2 on QDR.C
         addr = self.routers[2].addresses[1]
 
@@ -398,6 +410,8 @@ class LinkRoutePatternTest(TestCase):
 
         identity_1 = result_list[0][1]
         identity_2 = result_list[1][1]
+        identity_3 = result_list[2][1]
+        identity_4 = result_list[3][1]
 
         cmd = 'DELETE --type=linkRoute --identity=' + identity_1
         self.run_qdmanage(cmd=cmd, address=addr)
@@ -405,6 +419,12 @@ class LinkRoutePatternTest(TestCase):
         cmd = 'DELETE --type=linkRoute --identity=' + identity_2
         self.run_qdmanage(cmd=cmd, address=addr)
 
+        cmd = 'DELETE --type=linkRoute --identity=' + identity_3
+        self.run_qdmanage(cmd=cmd, address=addr)
+
+        cmd = 'DELETE --type=linkRoute --identity=' + identity_4
+        self.run_qdmanage(cmd=cmd, address=addr)
+
         cmd = 'QUERY --type=linkRoute'
         out = self.run_qdmanage(cmd=cmd, address=addr)
         self.assertEquals(out.rstrip(), '[]')
@@ -436,5 +456,82 @@ class LinkRoutePatternTest(TestCase):
                                             name='M0org.apache.dev').deliveriesEgress,
                          "deliveriesEgress is wrong")
 
+    def test_yyy_delivery_tag(self):
+        """
+        Tests that the router carries over the delivery tag on a link routed delivery
+        """
+        listening_address = self.routers[1].addresses[1]
+        sender_address = self.routers[2].addresses[1]
+        qdstat_address = self.routers[2].addresses[1]
+        test = DeliveryTagsTest(sender_address, listening_address, qdstat_address)
+        test.run()
+        self.assertTrue(test.message_received)
+        self.assertTrue(test.delivery_tag_verified)
+
+class DeliveryTagsTest(MessagingHandler):
+    def __init__(self, sender_address, listening_address, qdstat_address):
+        super(DeliveryTagsTest, self).__init__()
+        self.sender_address = sender_address
+        self.listening_address = listening_address
+        self.sender = None
+        self.message_received = False
+        self.receiver_connection = None
+        self.sender_connection = None
+        self.qdstat_address = qdstat_address
+        self.id = '1235'
+        self.times = 1
+        self.delivery_tag_verified = False
+        # The delivery tag we are going to send in the transfer frame
+        # We will later make sure that the same delivery tag shows up on the receiving end in the link routed case.
+        self.delivery_tag = '92319'
+
+    def on_start(self, event):
+        self.receiver_connection = event.container.connect(self.listening_address)
+
+    def on_connection_remote_open(self, event):
+        if event.connection == self.receiver_connection:
+            continue_loop = True
+            # Dont open the sender connection unless we can make sure that there is a remote receiver ready to
+            # accept the message.
+            # If there is no remote receiver, the router will throw a 'No route to destination' error when
+            # creating sender connection.
+            # The following loops introduces a wait before creating the sender connection. It gives time to the
+            # router so that the address Dpulp.task can show up on the remoteCount
+            i = 0
+            while continue_loop:
+                if i > 100: # If we have run the read command for more than hundred times and we still do not have
+                    # the remoteCount set to 1, there is a problem, just exit out of the function instead
+                    # of looping to infinity.
+                    self.receiver_connection.close()
+                    return
+                local_node = Node.connect(self.qdstat_address, timeout=TIMEOUT)
+                out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount
+                if out == 1:
+                    continue_loop = False
+                i+=1
+
+            self.sender_connection = event.container.connect(self.sender_address)
+            self.sender = event.container.create_sender(self.sender_connection, "pulp.task", options=AtMostOnce())
+
+    def on_sendable(self, event):
+        if self.times == 1:
+            msg = Message(body="Hello World")
+            self.sender.send(msg, tag=self.delivery_tag)
+            self.sender_connection.close()
+            self.times +=1
+
+    def on_message(self, event):
+        if "Hello World" == event.message.body:
+            self.message_received = True
+
+        # If the tag on the delivery is the same as the tag we sent with the initial transfer, it means
+        # that the router has propagated the delivery tag successfully because of link routing.
+        if self.delivery_tag == event.delivery.tag:
+            self.delivery_tag_verified = True
+        self.receiver_connection.close()
+
+    def run(self):
+        Container(self).run()
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org