You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2017/07/24 13:09:05 UTC
qpid-dispatch git commit: make all receivers get messages
Repository: qpid-dispatch
Updated Branches:
refs/heads/master b38d81d0f -> ee4285f9b
make all receivers get messages
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ee4285f9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ee4285f9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ee4285f9
Branch: refs/heads/master
Commit: ee4285f9bcceb788d0cd5c7d4da01991733cdd55
Parents: b38d81d
Author: mick goulish <mg...@redhat.com>
Authored: Fri Jul 21 20:06:14 2017 -0400
Committer: mick goulish <mg...@redhat.com>
Committed: Fri Jul 21 20:06:14 2017 -0400
----------------------------------------------------------------------
tests/system_tests_three_routers.py | 188 ++++++++++++++++++++++++++++++-
1 file changed, 187 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ee4285f9/tests/system_tests_three_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py
index 289cc02..07ea8fd 100644
--- a/tests/system_tests_three_routers.py
+++ b/tests/system_tests_three_routers.py
@@ -48,7 +48,9 @@ class RouterTest(TestCase):
def router(name, more_config):
- config = [ ('router', {'mode': 'interior', 'id': name}) ] + more_config
+ config = [ ('router', {'mode': 'interior', 'id': name}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'})
+ ] + more_config
config = Qdrouterd.Config(config)
@@ -180,6 +182,7 @@ class RouterTest(TestCase):
cls.routers[2].wait_router_connected('B')
cls.A_normal_addr = cls.routers[0].addresses[0]
+ cls.B_normal_addr = cls.routers[1].addresses[0]
cls.C_normal_addr = cls.routers[2].addresses[0]
cls.sender_addr = cls.C_normal_addr
@@ -215,6 +218,15 @@ class RouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_05_closest ( self ):
+ test = ClosestTest ( self.A_normal_addr,
+ self.B_normal_addr,
+ self.C_normal_addr,
+ "addr_05"
+ )
+ test.run()
+ self.assertEqual(None, test.error)
+
@@ -624,5 +636,179 @@ class LinkAttachRouting ( MessagingHandler ):
container.run()
+
+class ClosestTest ( MessagingHandler ):
+ """
+ Test whether distance-based message routing works in a
+ linear 3-router network.
+
+ sender -----> NEAR -----> MID -----> FAR
+ | | |
+ v v v
+ near mid far
+ rcvrs rcvrs rcvrs
+
+ With a linear network of 3 routers, set up a sender on the
+ near one, and then 2 receivers each on the near, middle, and
+ far routers.
+ After the first 10 messages have been received, close the
+ near routers and check results so far. All 10 messages should
+ have gone to the near receivers, and none to the mid or far
+ receivers.
+ After the next 10 messages have been received, close the two
+ middle routers and check again. All 10 messages should have
+ gone to the middle receivers, and none to the far ones.
+ Finally, after another 10 messages have been received, check
+ that they went to the far receivers.
+ """
+ def __init__ ( self, near_router, mid_router, far_router, addr_suffix ):
+ super ( ClosestTest, self ).__init__(prefetch=0)
+ self.error = None
+ self.near_router = near_router
+ self.mid_router = mid_router
+ self.far_router = far_router
+ self.addr_suffix = addr_suffix
+ self.dest = "closest/" + addr_suffix
+
+ # This n_expected is actually the minimum number of messages
+ # I will send. The real number will be higher because some
+ # will be released when I close the near and middle receivers.
+ self.n_expected = 150
+ self.one_third = self.n_expected / 3
+
+ # n_received -- the grand total -- is used to decide when to
+ # close the near receivers and later the middle ones.
+ self.n_received = 0
+
+ # Counters for the near, middle, and far receivers are used
+ # to determine whether there has been an error.
+ self.near_1 = 0
+ self.near_2 = 0
+ self.mid_1 = 0
+ self.mid_2 = 0
+ self.far_1 = 0
+ self.far_2 = 0
+
+
+ def timeout ( self ):
+ self.check_results ( )
+ self.bail ( "Timeout Expired " )
+
+
+ def bail ( self, text ):
+ self.timer.cancel()
+ self.error = text
+ self.send_cnx.close()
+ self.near_cnx.close()
+ self.mid_cnx.close()
+ self.far_cnx.close()
+
+
+ def on_start ( self, event ):
+ self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) )
+ self.send_cnx = event.container.connect ( self.near_router )
+ self.near_cnx = event.container.connect ( self.near_router )
+ self.mid_cnx = event.container.connect ( self.mid_router )
+ self.far_cnx = event.container.connect ( self.far_router )
+
+ self.sender = event.container.create_sender ( self.send_cnx, self.dest)
+
+ # The two receiver-links on each router must be given
+ # explicit distinct names, or we will in fact only get
+ # one link. And then wonder why receiver 2 on each
+ # router isn't getting any messages.
+ self.near_recv_1 = event.container.create_receiver ( self.near_cnx, self.dest, name="1" )
+ self.near_recv_2 = event.container.create_receiver ( self.near_cnx, self.dest, name="2" )
+
+ self.mid_recv_1 = event.container.create_receiver ( self.mid_cnx, self.dest, name="1" )
+ self.mid_recv_2 = event.container.create_receiver ( self.mid_cnx, self.dest, name="2" )
+
+ self.far_recv_1 = event.container.create_receiver ( self.far_cnx, self.dest, name="1" )
+ self.far_recv_2 = event.container.create_receiver ( self.far_cnx, self.dest, name="2" )
+
+ self.near_recv_1.flow ( self.n_expected )
+ self.mid_recv_1.flow ( self.n_expected )
+ self.far_recv_1.flow ( self.n_expected )
+
+ self.near_recv_2.flow ( self.n_expected )
+ self.mid_recv_2.flow ( self.n_expected )
+ self.far_recv_2.flow ( self.n_expected )
+
+
+ def on_sendable ( self, event ):
+ msg = Message ( body = "Hello, closest.",
+ address = self.dest
+ )
+ event.sender.send ( msg )
+
+
+ def on_message ( self, event ):
+
+ self.n_received += 1
+
+ # Increment the near, mid, or far counts, depending on
+ # which receiver the message came in on.
+ if event.receiver == self.near_recv_1:
+ self.near_1 += 1
+ elif event.receiver == self.near_recv_2:
+ self.near_2 += 1
+ elif event.receiver == self.mid_recv_1:
+ self.mid_1 += 1
+ elif event.receiver == self.mid_recv_2:
+ self.mid_2 += 1
+ elif event.receiver == self.far_recv_1:
+ self.far_1 += 1
+ elif event.receiver == self.far_recv_2:
+ self.far_2 += 1
+
+ if self.n_received == self.one_third:
+ # The first one-third of messages should have gone exclusively
+ # to the near receivers. At this point we should have
+ # no messages in the mid or far receivers.
+ self.near_recv_1.close()
+ self.near_recv_2.close()
+ if self.mid_1 + self.mid_2 + self.far_1 + self.far_2 > 0 :
+ self.bail ( "error: mid or far receivers got messages before near were closed." )
+ # Make sure we got one third of the messages.
+ if self.near_1 + self.near_2 < self.one_third:
+ self.bail ( "error: the near receivers got too few messages." )
+ # Make sure both receivers got some messages.
+ if self.near_1 * self.near_2 == 0:
+ self.bail ( "error: one of the near receivers got no messages." )
+
+ elif self.n_received == 2 * self.one_third:
+ # The next one-third of messages should have gone exclusively
+ # to the mid receivers. At this point we should have
+ # no messages in the far receivers.
+ self.mid_recv_1.close()
+ self.mid_recv_2.close()
+ if self.far_1 + self.far_2 > 0 :
+ self.bail ( "error: far receivers got messages before mid were closed." )
+ # Make sure we got one third of the messages.
+ if self.mid_1 + self.mid_2 < self.one_third:
+ self.bail ( "error: the mid receivers got too few messages." )
+ # Make sure both receivers got some messages.
+ if self.mid_1 * self.mid_2 == 0:
+ self.bail ( "error: one of the mid receivers got no messages." )
+
+ # By the time we reach the expected number of messages
+ # we have closed the near and middle receivers. If the far
+ # receivers are empty at this point, something is wrong.
+ if self.n_received >= self.n_expected :
+ if self.far_1 + self.far_2 < self.one_third:
+ self.bail ( "error: the far receivers got too few messages." )
+ if self.far_1 * self.far_2 == 0:
+ self.bail ( "error: one of the far receivers got no messages." )
+ else:
+ self.bail ( None )
+
+
+ def run(self):
+ container = Container(self)
+ container.run()
+
+
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org