You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/12/19 14:23:00 UTC

qpid-dispatch git commit: DISPATCH-209 - From Mick Goulish - Added anonymous sender for 3-router test. This closes #126

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 21a32cad7 -> 3086de102


DISPATCH-209 - From Mick Goulish - Added anonymous sender for 3-router test.
This closes #126


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3086de10
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3086de10
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3086de10

Branch: refs/heads/master
Commit: 3086de102ff125fc9a218c15140db4f858210da9
Parents: 21a32ca
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Dec 19 09:21:52 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Dec 19 09:21:52 2016 -0500

----------------------------------------------------------------------
 tests/system_tests_three_routers.py | 84 +++++++++++++++++++++++++++++++-
 1 file changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3086de10/tests/system_tests_three_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py
index def31ff..2414c92 100644
--- a/tests/system_tests_three_routers.py
+++ b/tests/system_tests_three_routers.py
@@ -23,6 +23,7 @@ from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SS
 from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, AtMostOnce, AtLeastOnce
+import time
 
 
 # PROTON-828:
@@ -65,6 +66,7 @@ class RouterTest(TestCase):
         inter_router_port_1 = cls.tester.get_port()
         inter_router_port_2 = cls.tester.get_port()
 
+        #   A <--- B <--- C 
         router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port_1}) )
 
         router('B', ('listener', {'role': 'inter-router', 'port': inter_router_port_2}),
@@ -83,6 +85,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_02_anonymous_sender(self):
+        test = AnonymousSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -112,7 +119,8 @@ class TargetedSenderTest(MessagingHandler):
         self.conn2.close()
 
     def on_start(self, event):
-        self.timer = event.reactor.schedule(10, Timeout(self))
+        # receiver <--- A <--- B <---- C <--- sender
+        self.timer = event.reactor.schedule(5, Timeout(self))
         self.conn1 = event.container.connect(self.address1)
         self.conn2 = event.container.connect(self.address2)
         self.sender   = event.container.create_sender(self.conn1, self.dest)
@@ -136,10 +144,82 @@ class TargetedSenderTest(MessagingHandler):
             self.conn2.close()
             self.timer.cancel()
 
-
     def run(self):
         Container(self).run()
+ 
+ 
 
+class AnonymousSenderTest(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(AnonymousSenderTest, self).__init__(prefetch=0)
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "closest.Anonymous"
+        self.error      = None
+        self.sender     = None
+        self.receiver   = None
+        self.n_expected = 10
+        self.n_sent     = 0
+        self.n_received = 0
+        self.n_accepted = 0
+ 
+    def timeout(self):
+        self.error = "Timeout Expired %d messages received." % self.n_received
+        self.conn1.close()
+        self.conn2.close()
+ 
+    # The problem with using an anonymous sender in a router 
+    # network is that it takes finite time for endpoint information
+    # to propagate around the network.  It is possible for me to
+    # start sending before my router knows how to route my messages,
+    # which will cause them to get released, and my test will hang,
+    # doomed to wait eternally for the tenth message to be received.
+    # To fix this, we will detect released messages here, and decrement
+    # the sent message counter, forcing a resend for each drop.
+    # And also pause for a moment, since we know that the network is 
+    # not yet ready.
+    def on_released(self, event):
+        self.n_sent -= 1
+        time.sleep(0.1)
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            # This sender has no destination addr, so we will have to
+            # address each message individually.
+            # Also -- Create the sender here, when we know that the
+            # receiver link has opened, because then we are at least 
+            # close to being able to send.  (See comment above.)
+            self.sender = event.container.create_sender(self.conn1, None)
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.conn2 = event.container.connect(self.address2)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
+        self.receiver.flow(self.n_expected)
+ 
+    def on_sendable(self, event):
+        if self.n_sent < self.n_expected:
+            # Add the destination addr to each message.
+            msg = Message(body=self.n_sent, address=self.dest)
+            event.sender.send(msg)
+            self.n_sent += 1
+ 
+    def on_accepted(self, event):
+        self.n_accepted += 1
+ 
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == self.n_expected:
+            self.receiver.close()
+            self.conn1.close()
+            self.conn2.close()
+            self.timer.cancel()
 
+    def run(self):
+        Container(self).run()
+ 
+ 
+ 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org