You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2017/07/24 13:09:05 UTC

qpid-dispatch git commit: make all receivers get messages

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master b38d81d0f -> ee4285f9b


make all receivers get messages


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ee4285f9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ee4285f9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ee4285f9

Branch: refs/heads/master
Commit: ee4285f9bcceb788d0cd5c7d4da01991733cdd55
Parents: b38d81d
Author: mick goulish <mg...@redhat.com>
Authored: Fri Jul 21 20:06:14 2017 -0400
Committer: mick goulish <mg...@redhat.com>
Committed: Fri Jul 21 20:06:14 2017 -0400

----------------------------------------------------------------------
 tests/system_tests_three_routers.py | 188 ++++++++++++++++++++++++++++++-
 1 file changed, 187 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ee4285f9/tests/system_tests_three_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py
index 289cc02..07ea8fd 100644
--- a/tests/system_tests_three_routers.py
+++ b/tests/system_tests_three_routers.py
@@ -48,7 +48,9 @@ class RouterTest(TestCase):
         def router(name, more_config):
 
 
-            config = [ ('router', {'mode': 'interior', 'id': name}) ] + more_config
+            config = [ ('router',  {'mode': 'interior', 'id': name}),
+                       ('address', {'prefix': 'closest', 'distribution': 'closest'})
+                     ] + more_config
 
             config = Qdrouterd.Config(config)
 
@@ -180,6 +182,7 @@ class RouterTest(TestCase):
         cls.routers[2].wait_router_connected('B')
 
         cls.A_normal_addr      = cls.routers[0].addresses[0]
+        cls.B_normal_addr      = cls.routers[1].addresses[0]
         cls.C_normal_addr      = cls.routers[2].addresses[0]
 
         cls.sender_addr        = cls.C_normal_addr
@@ -215,6 +218,15 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_05_closest ( self ):
+        test = ClosestTest ( self.A_normal_addr,
+                             self.B_normal_addr,
+                             self.C_normal_addr,
+                             "addr_05"
+                           )
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 
 
@@ -624,5 +636,179 @@ class LinkAttachRouting ( MessagingHandler ):
         container.run()
 
 
+
+class ClosestTest ( MessagingHandler ):
+    """
+         Test whether distance-based message routing works in a
+         linear 3-router network.
+
+         sender -----> NEAR -----> MID -----> FAR
+                        |           |          |
+                        v           v          v
+                       near        mid        far
+                       rcvrs       rcvrs      rcvrs
+
+         With a linear network of 3 routers, set up a sender on the
+         near one, and then 2 receivers each on the near, middle, and
+         far routers.
+         After the first 10 messages have been received, close the
+         near routers and check results so far.  All 10 messages should
+         have gone to the near receivers, and none to the mid or far
+         receivers.
+         After the next 10 messages have been received, close the two
+         middle routers and check again.  All 10 messages should have
+         gone to the middle receivers, and none to the far ones.
+         Finally, after another 10 messages have been received, check
+         that they went to the far receivers.
+    """
+    def __init__ ( self, near_router, mid_router, far_router, addr_suffix ):
+        super ( ClosestTest, self ).__init__(prefetch=0)
+        self.error         = None
+        self.near_router   = near_router
+        self.mid_router    = mid_router
+        self.far_router    = far_router
+        self.addr_suffix   = addr_suffix
+        self.dest          = "closest/" + addr_suffix
+
+        # This n_expected is actually the minimum number of messages
+        # I will send.  The real number will be higher because some
+        # will be released when I close the near and middle receivers.
+        self.n_expected = 150
+        self.one_third  = self.n_expected / 3
+
+        # n_received -- the grand total -- is used to decide when to
+        # close the near receivers and later the middle ones.
+        self.n_received    = 0
+
+        # Counters for the near, middle, and far receivers are used
+        # to determine whether there has been an error.
+        self.near_1 = 0
+        self.near_2 = 0
+        self.mid_1  = 0
+        self.mid_2  = 0
+        self.far_1  = 0
+        self.far_2  = 0
+
+
+    def timeout ( self ):
+        self.check_results ( )
+        self.bail ( "Timeout Expired " )
+
+
+    def bail ( self, text ):
+        self.timer.cancel()
+        self.error = text
+        self.send_cnx.close()
+        self.near_cnx.close()
+        self.mid_cnx.close()
+        self.far_cnx.close()
+
+
+    def on_start ( self, event ):
+        self.timer       = event.reactor.schedule  ( TIMEOUT, Timeout(self) )
+        self.send_cnx    = event.container.connect ( self.near_router )
+        self.near_cnx    = event.container.connect ( self.near_router )
+        self.mid_cnx     = event.container.connect ( self.mid_router )
+        self.far_cnx     = event.container.connect ( self.far_router )
+
+        self.sender      = event.container.create_sender   ( self.send_cnx, self.dest)
+
+        # The two receiver-links on each router must be given
+        # explicit distinct names, or we will in fact only get
+        # one link.  And then wonder why receiver 2 on each
+        # router isn't getting any messages.
+        self.near_recv_1 = event.container.create_receiver  ( self.near_cnx, self.dest, name="1" )
+        self.near_recv_2 = event.container.create_receiver  ( self.near_cnx, self.dest, name="2" )
+
+        self.mid_recv_1  = event.container.create_receiver  ( self.mid_cnx,  self.dest, name="1" )
+        self.mid_recv_2  = event.container.create_receiver  ( self.mid_cnx,  self.dest, name="2" )
+
+        self.far_recv_1  = event.container.create_receiver  ( self.far_cnx,  self.dest, name="1" )
+        self.far_recv_2  = event.container.create_receiver  ( self.far_cnx,  self.dest, name="2" )
+
+        self.near_recv_1.flow ( self.n_expected )
+        self.mid_recv_1.flow  ( self.n_expected )
+        self.far_recv_1.flow  ( self.n_expected )
+
+        self.near_recv_2.flow ( self.n_expected )
+        self.mid_recv_2.flow  ( self.n_expected )
+        self.far_recv_2.flow  ( self.n_expected )
+
+
+    def on_sendable ( self, event ):
+        msg = Message ( body     = "Hello, closest.",
+                        address  = self.dest
+                      )
+        event.sender.send ( msg )
+
+
+    def on_message ( self, event ):
+
+        self.n_received += 1
+
+        # Increment the near, mid, or far counts, depending on
+        # which receiver the message came in on.
+        if event.receiver == self.near_recv_1:
+            self.near_1        += 1
+        elif event.receiver == self.near_recv_2:
+            self.near_2        += 1
+        elif event.receiver == self.mid_recv_1:
+            self.mid_1         += 1
+        elif event.receiver == self.mid_recv_2:
+            self.mid_2         += 1
+        elif event.receiver == self.far_recv_1:
+            self.far_1         += 1
+        elif event.receiver == self.far_recv_2:
+            self.far_2         += 1
+
+        if self.n_received == self.one_third:
+            # The first one-third of messages should have gone exclusively
+            # to the near receivers.  At this point we should have
+            # no messages in the mid or far receivers.
+            self.near_recv_1.close()
+            self.near_recv_2.close()
+            if self.mid_1 + self.mid_2 + self.far_1 + self.far_2 > 0 :
+                self.bail ( "error: mid or far receivers got messages before near were closed." )
+            # Make sure we got one third of the messages.
+            if self.near_1 + self.near_2 < self.one_third:
+                self.bail ( "error: the near receivers got too few messages." )
+            # Make sure both receivers got some messages.
+            if self.near_1 * self.near_2 == 0:
+                self.bail ( "error: one of the near receivers got no messages." )
+
+        elif self.n_received == 2 * self.one_third:
+            # The next one-third of messages should have gone exclusively
+            # to the mid receivers.  At this point we should have
+            # no messages in the far receivers.
+            self.mid_recv_1.close()
+            self.mid_recv_2.close()
+            if self.far_1 + self.far_2 > 0 :
+                self.bail ( "error: far receivers got messages before mid were closed." )
+            # Make sure we got one third of the messages.
+            if self.mid_1 + self.mid_2 < self.one_third:
+                self.bail ( "error: the mid receivers got too few messages." )
+            # Make sure both receivers got some messages.
+            if self.mid_1 * self.mid_2 == 0:
+                self.bail ( "error: one of the mid receivers got no messages." )
+
+        # By the time we reach the expected number of messages
+        # we have closed the near and middle receivers.  If the far
+        # receivers are empty at this point, something is wrong.
+        if self.n_received >= self.n_expected :
+            if self.far_1 + self.far_2 < self.one_third:
+                self.bail ( "error: the far receivers got too few messages." )
+            if self.far_1 * self.far_2 == 0:
+                self.bail ( "error: one of the far receivers got no messages." )
+            else:
+                self.bail ( None )
+
+
+    def run(self):
+        container = Container(self)
+        container.run()
+
+
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org