You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2017/05/24 13:42:02 UTC

qpid-dispatch git commit: DISPATCH-209 -- three-router test #3 dynamic reply-to. This closes #163

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 8c9f4a581 -> 5fd599d4d


DISPATCH-209 -- three-router test #3 dynamic reply-to. This closes #163

(cherry picked from commit b66a5c7620409252287ae2e3993a17c56758beae)


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/5fd599d4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/5fd599d4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/5fd599d4

Branch: refs/heads/master
Commit: 5fd599d4df3e0dfef377419636e5fffe9537b296
Parents: 8c9f4a5
Author: mick goulish <mg...@redhat.com>
Authored: Wed May 17 14:12:40 2017 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Wed May 24 09:38:55 2017 -0400

----------------------------------------------------------------------
 tests/system_tests_three_routers.py | 354 ++++++++++++++++++++++---------
 1 file changed, 253 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5fd599d4/tests/system_tests_three_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py
index 83719e5..cfc210c 100644
--- a/tests/system_tests_three_routers.py
+++ b/tests/system_tests_three_routers.py
@@ -22,7 +22,7 @@ from subprocess import PIPE, STDOUT
 from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
 from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
 from proton.handlers import MessagingHandler
-from proton.reactor import Container, AtMostOnce, AtLeastOnce
+from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption
 import time
 
 
@@ -39,56 +39,121 @@ class RouterTest(TestCase):
 
     @classmethod
     def setUpClass(cls):
-        """Start a router and a messenger"""
+        """Start a router and a sender-listener client"""
         super(RouterTest, cls).setUpClass()
 
-        def router(name, connection_1, connection_2=None):
+        def router ( name, connection_1, connection_2=None ):
 
             config = [
-                ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
-
-                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
-
-                ('address', {'prefix': 'closest',   'distribution': 'closest'}),
-                ('address', {'prefix': 'spread',    'distribution': 'balanced'}),
-                ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+                ('router',
+                  {'mode' : 'interior',
+                   'id'   : 'QDR.%s' % name
+                  }
+                ),
+                ('listener',
+                  {'port'             : cls.tester.get_port(),
+                   'stripAnnotations' : 'no'
+                  }
+                ),
+                ('address',
+                    { 'prefix'       : 'closest',
+                      'distribution' : 'closest'
+                    }
+                ),
             ]
-            config.append(connection_1)
+            config.append ( connection_1 )
             if None != connection_2:
-                config.append(connection_2)
+                config.append ( connection_2 )
 
-            config = Qdrouterd.Config(config)
+            config = Qdrouterd.Config ( config )
 
-            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+            cls.routers.append ( cls.tester.qdrouterd(name, config, wait=True) )
 
         cls.routers = []
 
-        inter_router_port_1 = cls.tester.get_port()
-        inter_router_port_2 = cls.tester.get_port()
-
-        #   A <--- B <--- C
-        router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port_1}) )
-
-        router('B', ('listener', {'role': 'inter-router', 'port': inter_router_port_2}),
-                    ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port_1, 'verifyHostName': 'no'}))
+        inter_router_port_A = cls.tester.get_port()
+        inter_router_port_B = cls.tester.get_port()
+        port_for_sender     = cls.tester.get_port()
+
+
+        router ( 'A',
+                   ( 'listener',
+                       {'role': 'inter-router',
+                        'port': inter_router_port_A
+                       }
+                   )
+               )
+
+        router ( 'B',
+                   ( 'listener',
+                       { 'role': 'inter-router',
+                         'port': inter_router_port_B
+                       }
+                   ),
+                   ( 'connector',
+                       { 'name': 'connectorToA',
+                         'role': 'inter-router',
+                         'port': inter_router_port_A,
+                         'verifyHostName': 'no'
+                       }
+                   )
+               )
+
+        router ( 'C',
+                   ( 'connector',
+                       { 'name': 'connectorToB',
+                         'role': 'inter-router',
+                         'port': inter_router_port_B,
+                         'verifyHostName': 'no'
+                       }
+                   ),
+                   ( 'listener',
+                       { 'role': 'normal',
+                         'port': port_for_sender
+                       }
+                   )
+               )
+
+
+        cls.router_A = cls.routers[0]
+        cls.router_B = cls.routers[1]
+        cls.router_C = cls.routers[2]
+
+        #----------------------------------------------
+        # Wait until everybody can see everybody,
+        # to minimize the time when the network
+        # doesn't know how to route my messages.
+        #----------------------------------------------
+        cls.router_C.wait_router_connected('QDR.B')
+        cls.router_B.wait_router_connected('QDR.A')
+        cls.router_A.wait_router_connected('QDR.C')
+
+        #------------------------------------------------
+        # In these tests, first address will be used
+        # by the sender, second by the receiver.
+        #
+        #   receiver <--- A <--- B <--- C <--- sender
+        #
+        #------------------------------------------------
+        cls.send_addr = cls.router_C.addresses[1]
+        cls.recv_addr = cls.router_A.addresses[0]
 
-        router('C', ('connector', {'name': 'connectorToB', 'role': 'inter-router', 'port': inter_router_port_2, 'verifyHostName': 'no'}))
-
-        cls.routers[0].wait_router_connected('QDR.C')
-        cls.routers[1].wait_router_connected('QDR.B')
-        cls.routers[2].wait_router_connected('QDR.A')
 
+    def test_01_targeted_sender(self):
+        test = TargetedSenderTest ( self.send_addr, self.recv_addr )
+        test.run()
+        self.assertEqual(None, test.error)
 
+    def test_02_anonymous_sender(self):
+        test = AnonymousSenderTest ( self.send_addr, self.recv_addr )
+        test.run()
+        self.assertEqual(None, test.error)
 
-    def test_01_targeted_sender(self):
-        test = TargetedSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0])
+    def test_03_dynamic_reply_to(self):
+        test = DynamicReplyTo ( self.send_addr, self.recv_addr )
         test.run()
         self.assertEqual(None, test.error)
 
-#    def test_02_anonymous_sender(self):
-#        test = AnonymousSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0])
-#        test.run()
-#        self.assertEqual(None, test.error)
 
 
 class Timeout(object):
@@ -100,10 +165,10 @@ class Timeout(object):
 
 
 class TargetedSenderTest(MessagingHandler):
-    def __init__(self, address1, address2):
+    def __init__(self, send_addr, recv_addr):
         super(TargetedSenderTest, self).__init__(prefetch=0)
-        self.address1 = address1
-        self.address2 = address2
+        self.send_addr = send_addr
+        self.recv_addr = recv_addr
         self.dest = "closest.Targeted"
         self.error      = None
         self.sender     = None
@@ -114,24 +179,29 @@ class TargetedSenderTest(MessagingHandler):
         self.n_accepted = 0
 
     def timeout(self):
-        self.error = "Timeout Expired"
-        self.conn1.close()
-        self.conn2.close()
+        self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \
+                     (self.n_sent, self.n_received, self.n_accepted)
+        self.send_conn.close()
+        self.recv_conn.close()
 
     def on_start(self, event):
-        # receiver <--- A <--- B <---- C <--- sender
-        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
-        self.conn1 = event.container.connect(self.address1)
-        self.conn2 = event.container.connect(self.address2)
-        self.sender   = event.container.create_sender(self.conn1, self.dest)
-        self.receiver = event.container.create_receiver(self.conn2, self.dest)
+        self.timer = event.reactor.schedule(10, Timeout(self))
+        self.send_conn = event.container.connect(self.send_addr)
+        self.recv_conn = event.container.connect(self.recv_addr)
+        self.sender   = event.container.create_sender(self.send_conn, self.dest)
+        self.receiver = event.container.create_receiver(self.recv_conn, self.dest)
         self.receiver.flow(self.n_expected)
 
+
+    def send(self):
+      while self.sender.credit > 0 and self.n_sent < self.n_expected:
+        msg = Message(body=self.n_sent)
+        self.sender.send(msg)
+        self.n_sent += 1
+
     def on_sendable(self, event):
         if self.n_sent < self.n_expected:
-            msg = Message(body=self.n_sent)
-            event.sender.send(msg)
-            self.n_sent += 1
+            self.send()
 
     def on_accepted(self, event):
         self.n_accepted += 1
@@ -140,8 +210,8 @@ class TargetedSenderTest(MessagingHandler):
         self.n_received += 1
         if self.n_received == self.n_expected:
             self.receiver.close()
-            self.conn1.close()
-            self.conn2.close()
+            self.send_conn.close()
+            self.recv_conn.close()
             self.timer.cancel()
 
     def run(self):
@@ -149,77 +219,159 @@ class TargetedSenderTest(MessagingHandler):
 
 
 
+class DynamicTarget(LinkOption):
+    def apply(self, link):
+        link.target.dynamic = True
+        link.target.address = None
+
+
+
 class AnonymousSenderTest(MessagingHandler):
-    def __init__(self, address1, address2):
-        super(AnonymousSenderTest, self).__init__(prefetch=0)
-        self.address1 = address1
-        self.address2 = address2
-        self.dest = "closest.Anonymous"
-        self.error      = None
-        self.sender     = None
-        self.receiver   = None
-        self.n_expected = 10
+
+    def __init__(self, send_addr, recv_addr):
+        super(AnonymousSenderTest, self).__init__()
+        self.send_addr = send_addr
+        self.recv_addr = recv_addr
+
+        self.error     = None
+        self.recv_conn = None
+        self.send_conn = None
+        self.sender    = None
+        self.receiver  = None
+        self.address   = None
+
+        self.expected   = 10
         self.n_sent     = 0
         self.n_received = 0
         self.n_accepted = 0
 
-    def timeout(self):
-        self.error = "Timeout Expired %d messages received." % self.n_received
-        self.conn1.close()
-        self.conn2.close()
-
-    # The problem with using an anonymous sender in a router
-    # network is that it takes finite time for endpoint information
-    # to propagate around the network.  It is possible for me to
-    # start sending before my router knows how to route my messages,
-    # which will cause them to get released, and my test will hang,
-    # doomed to wait eternally for the tenth message to be received.
-    # To fix this, we will detect released messages here, and decrement
-    # the sent message counter, forcing a resend for each drop.
-    # And also pause for a moment, since we know that the network is
-    # not yet ready.
-    def on_released(self, event):
-        self.n_sent -= 1
-        time.sleep(0.1)
 
-    def on_link_opened(self, event):
-        if event.receiver:
-            # This sender has no destination addr, so we will have to
-            # address each message individually.
-            # Also -- Create the sender here, when we know that the
-            # receiver link has opened, because then we are at least
-            # close to being able to send.  (See comment above.)
-            self.sender = event.container.create_sender(self.conn1, None)
+    def timeout ( self ):
+        self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \
+                     (self.n_sent, self.n_received, self.n_accepted)
+        self.send_conn.close()
+        self.recv_conn.close()
 
     def on_start(self, event):
-        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
-        self.conn1 = event.container.connect(self.address1)
-        self.conn2 = event.container.connect(self.address2)
-        self.receiver = event.container.create_receiver(self.conn2, self.dest)
-        self.receiver.flow(self.n_expected)
+        self.timer     = event.reactor.schedule(10, Timeout(self))
+        self.send_conn = event.container.connect(self.send_addr)
+        self.recv_conn = event.container.connect(self.recv_addr)
+        self.sender    = event.container.create_sender(self.send_conn, options=DynamicTarget())
 
-    def on_sendable(self, event):
-        if self.n_sent < self.n_expected:
-            # Add the destination addr to each message.
-            msg = Message(body=self.n_sent, address=self.dest)
-            event.sender.send(msg)
+    def send(self):
+        while self.sender.credit > 0 and self.n_sent < self.expected:
             self.n_sent += 1
+            m = Message(address=self.address, body="Message %d of %d" % (self.n_sent, self.expected))
+            self.sender.send(m)
+
+    def on_link_opened(self, event):
+        if event.sender == self.sender:
+            self.address = self.sender.remote_target.address
+            self.receiver = event.container.create_receiver(self.recv_conn, self.address)
+
+    def on_sendable(self, event):
+        self.send()
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            self.n_received += 1
 
     def on_accepted(self, event):
         self.n_accepted += 1
+        if self.n_accepted == self.expected:
+            self.send_conn.close()
+            self.recv_conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+
+
+#=======================================================================
+# In this test we have a separate 'client' and 'server' with separate
+# connections.  The client sends requests to the server, and embeds in
+# them its desired reply-to address.  The server uses that address to
+# send back ambiguous and noncommittal messages.  The tests ends with
+# success if the client receives the expected number of replies, or
+# with failure if we time out.
+#=======================================================================
+class DynamicReplyTo(MessagingHandler):
+    def __init__(self, client_addr, server_addr):
+        super(DynamicReplyTo, self).__init__(prefetch=10)
+        self.client_addr        = client_addr
+        self.server_addr        = server_addr
+        self.dest               = "closest.dynamicRequestResponse"
+        self.error              = None
+        self.server_receiver    = None
+        self.client_receiver    = None
+        self.sender             = None
+        self.server_sender      = None
+        self.n_expected         = 10
+        self.n_sent             = 0
+        self.received_by_server = 0
+        self.received_by_client = 0
+
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d received_by_server=%d received_by_client=%d" % \
+                     (self.n_sent, self.received_by_server, self.received_by_client)
+        self.client_connection.close()
+        self.server_connection.close()
+
+
+    def on_start ( self, event ):
+        self.timer             = event.reactor.schedule ( 10, Timeout(self) )
+
+        # separate connections to simulate client and server.
+        self.client_connection = event.container.connect(self.client_addr)
+        self.server_connection = event.container.connect(self.server_addr)
+
+        self.sender            = event.container.create_sender(self.client_connection, self.dest)
+        self.server_sender     = event.container.create_sender(self.server_connection, None)
+
+        self.server_receiver   = event.container.create_receiver(self.server_connection, self.dest)
+        self.client_receiver   = event.container.create_receiver(self.client_connection, None, dynamic=True)
+
+
+
+    def on_sendable(self, event):
+        while event.sender.credit > 0 and self.n_sent < self.n_expected:
+            # We send to server, and tell it how to reply to the client.
+            reply_to_addr = self.client_receiver.remote_source.address
+
+            request = Message ( body=self.n_sent,
+                                address=self.dest,
+                                reply_to = reply_to_addr )
+            event.sender.send ( request )
+            self.n_sent += 1
+
 
     def on_message(self, event):
-        self.n_received += 1
-        if self.n_received == self.n_expected:
-            self.receiver.close()
-            self.conn1.close()
-            self.conn2.close()
-            self.timer.cancel()
+        # Server gets a request and responds to
+        # the address that is embedded in the message.
+        if event.receiver == self.server_receiver :
+            self.server_sender.send ( Message(address=event.message.reply_to,
+                                      body="Reply hazy, try again later.") )
+            self.received_by_server += 1
+
+        # Client gets a response and counts it.
+        elif event.receiver == self.client_receiver :
+            self.received_by_client += 1
+            if self.received_by_client == self.n_expected:
+                self.timer.cancel()
+                self.server_receiver.close()
+                self.client_receiver.close()
+                self.client_connection.close()
+                self.server_connection.close()
+
 
     def run(self):
         Container(self).run()
 
 
 
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org