You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2017/10/19 18:17:44 UTC
qpid-dispatch git commit: DISPATCH-209 : parallel waypoint test
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 121e4065c -> c895d1c44
DISPATCH-209 : parallel waypoint test
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c895d1c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c895d1c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c895d1c4
Branch: refs/heads/master
Commit: c895d1c440e9ad24d1ae4415f27e5f26ee26357e
Parents: 121e406
Author: mick goulish <mg...@redhat.com>
Authored: Thu Oct 19 14:16:21 2017 -0400
Committer: mick goulish <mg...@redhat.com>
Committed: Thu Oct 19 14:16:21 2017 -0400
----------------------------------------------------------------------
tests/system_tests_distribution.py | 883 +++++++++++++++++++++++++++-----
1 file changed, 745 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c895d1c4/tests/system_tests_distribution.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_distribution.py b/tests/system_tests_distribution.py
index 8084344..0156afc 100644
--- a/tests/system_tests_distribution.py
+++ b/tests/system_tests_distribution.py
@@ -121,8 +121,21 @@ class DistributionTests ( TestCase ):
cls.linkroute_prefix = "0.0.0.0/linkroute"
- cls.waypoint_prefix_1 = "0.0.0.0/queue_1"
- cls.waypoint_prefix_2 = "0.0.0.0/queue_2"
+ cls.waypoint_prefix_1 = "0.0.0.0/process_1"
+ cls.waypoint_prefix_2 = "0.0.0.0/process_2"
+ cls.waypoint_prefix_3 = "0.0.0.0/process_3"
+
+ #-----------------------------------------------------
+ # Container IDs are what associate route containers
+ # with links -- for the linkroute tests and the
+ # waypoint tests.
+ #-----------------------------------------------------
+ cls.container_ids = [ 'ethics_gradient',
+ 'honest_mistake',
+ 'frank_exchange_of_views',
+ 'zero_gravitas',
+ 'yawning_angel'
+ ]
#-----------------------------------------------------
# Here are some chunks of configuration that will be
@@ -134,87 +147,160 @@ class DistributionTests ( TestCase ):
( 'linkRoute',
{ 'prefix': cls.linkroute_prefix,
'dir': 'in',
- 'containerId': 'LinkRouteTest'
+ 'containerId': cls.container_ids[0]
}
),
( 'linkRoute',
{ 'prefix': cls.linkroute_prefix,
'dir': 'out',
- 'containerId': 'LinkRouteTest'
+ 'containerId': cls.container_ids[0]
}
)
]
- waypoint_configuration_1 = \
+ single_waypoint_configuration = \
[
- ( 'address',
- { 'prefix': cls.waypoint_prefix_1,
+ ( 'address',
+ { 'prefix': cls.waypoint_prefix_1,
'waypoint': 'yes'
}
),
- ( 'autoLink',
- { 'addr': cls.waypoint_prefix_1 + '.waypoint',
- 'containerId': 'WaypointTest',
+ ( 'autoLink',
+ { 'addr': cls.waypoint_prefix_1 + '.waypoint',
+ 'containerId': cls.container_ids[1],
'dir': 'in'
}
),
- ( 'autoLink',
+ ( 'autoLink',
{ 'addr': cls.waypoint_prefix_1 + '.waypoint',
- 'containerId': 'WaypointTest',
+ 'containerId': cls.container_ids[1],
'dir': 'out'
}
)
]
- waypoint_configuration_2 = \
+ #-------------------------------------------------------------------
+ # The phase-number is used by the router as an addition
+ # to the address for the link. To chain these two waypoints
+ # together in a serial fashion, we explicitly declare their
+ # phase numbers:
+ # Waypoint 1
+ # out of router to process: phase 0
+ # back from process to router: phase 1
+ # Waypoint 2
+ # out of router to process: phase 1
+ # back from process to router: phase 2
+ #
+ # Because of those two "phase 1" markings, messages coming back
+ # into the router from Waypoint 1 get routed back outbound to
+ # Waypoint 2.
+ #
+ # Because the address configuration specifies that phase 2 is
+ # the egress phase, messages coming into the router from that
+ # autolink are finally routed to the client receiver.
+ #-------------------------------------------------------------------
+ serial_waypoint_configuration = \
[
- ( 'address',
- { 'prefix': cls.waypoint_prefix_2,
+ ( 'address',
+ { 'prefix': cls.waypoint_prefix_2,
'ingressPhase' : 0, # into the waypoint-process
'egressPhase' : 2, # out of the waypoint process
}
),
- ( 'autoLink',
- { 'addr': cls.waypoint_prefix_2 + '.waypoint',
- 'phase' : 0,
- 'containerId': 'WaypointTest2',
+
+ # Waypoint 1 configuration --------------------------
+ ( 'autoLink',
+ { 'addr': cls.waypoint_prefix_2 + '.waypoint',
+ 'phase' : 0,
+ 'containerId': cls.container_ids[2],
'dir': 'out' # out-of-router
}
),
- ( 'autoLink',
+ ( 'autoLink',
{ 'addr': cls.waypoint_prefix_2 + '.waypoint',
- 'phase' : 1,
- 'containerId': 'WaypointTest2',
+ 'phase' : 1,
+ 'containerId': cls.container_ids[2],
'dir': 'in' # into-router
}
),
- ( 'autoLink',
+
+ # Waypoint 2 configuration --------------------------
+ ( 'autoLink',
{ 'addr': cls.waypoint_prefix_2 + '.waypoint',
'phase' : 1, # out-of-router
- 'containerId': 'WaypointTest2',
+ 'containerId': cls.container_ids[2],
'dir': 'out'
}
),
- ( 'autoLink',
+ ( 'autoLink',
{ 'addr': cls.waypoint_prefix_2 + '.waypoint',
'phase' : 2, # into-router
- 'containerId': 'WaypointTest2',
+ 'containerId': cls.container_ids[2],
+ 'dir': 'in'
+ }
+ )
+ ]
+
+
+ #-------------------------------------------------------------
+ # In a parallel waypoint configuration, we use the default
+ # phase numbers: toward the waypoint is phase 0, back from
+ # the waypoint into the router is phase 1.
+ # The address configuration, by saying 'waypoint: yes' is
+ # shorthand for "ingress is phase 0, egress is phase 1"
+ # By making two identical waypoints, they will be used in
+ # parallel rather than serial.
+ #-------------------------------------------------------------
+ parallel_waypoint_configuration = \
+ [
+ ( 'address',
+ { 'prefix': cls.waypoint_prefix_3,
+ 'waypoint': 'yes'
+ }
+ ),
+
+ # Waypoint 1 configuration ----------------------
+ ( 'autoLink',
+ { 'addr': cls.waypoint_prefix_3 + '.waypoint',
+ 'containerId': cls.container_ids[3],
+ 'dir': 'in'
+ }
+ ),
+ ( 'autoLink',
+ { 'addr': cls.waypoint_prefix_3 + '.waypoint',
+ 'containerId': cls.container_ids[3],
+ 'dir': 'out'
+ }
+ ),
+
+ # Waypoint 2 configuration ----------------------
+ ( 'autoLink',
+ { 'addr': cls.waypoint_prefix_3 + '.waypoint',
+ 'containerId': cls.container_ids[3],
'dir': 'in'
}
+ ),
+ ( 'autoLink',
+ { 'addr': cls.waypoint_prefix_3 + '.waypoint',
+ 'containerId': cls.container_ids[3],
+ 'dir': 'out'
+ }
)
]
+
def router(name, more_config):
config = [ ('router', {'mode': 'interior', 'id': name}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'})
- ] \
- + linkroute_configuration \
- + waypoint_configuration_1 \
- + waypoint_configuration_2 \
+ ] \
+ + linkroute_configuration \
+ + single_waypoint_configuration \
+ + serial_waypoint_configuration \
+ + parallel_waypoint_configuration \
+ more_config
config = Qdrouterd.Config(config)
@@ -240,7 +326,7 @@ class DistributionTests ( TestCase ):
# Note: in the above picture, an arrow from, i.e., B to A
# means that B initiates the connection from itself to A.
# So if you see "B ----> A" in the picture, you should also
- # see a connector block in the configuration of B that
+ # see a connector block in the configuration of B that
# connects to an inter-router port on A.
#
@@ -416,46 +502,97 @@ class DistributionTests ( TestCase ):
cls.C_addr = router_C.addresses[0]
cls.D_addr = router_D.addresses[0]
+ # 1 means skip that test.
+ cls.skip = { 'test_01' : 0,
+ 'test_02' : 0,
+ 'test_03' : 0,
+ 'test_04' : 0,
+ 'test_05' : 0,
+ 'test_06' : 0,
+ 'test_07' : 0,
+ 'test_08' : 0,
+ 'test_09' : 0,
+ 'test_10' : 0,
+ 'test_11' : 0,
+ 'test_12' : 0,
+ 'test_13' : 0,
+ 'test_14' : 0,
+ 'test_15' : 0,
+ 'test_16' : 0,
+ 'test_17' : 0,
+ 'test_18' : 0,
+ 'test_19' : 0,
+ 'test_20' : 0,
+ 'test_21' : 0,
+ 'test_22' : 0,
+ 'test_23' : 0,
+ 'test_24' : 0,
+ 'test_25' : 0
+ }
+
def test_01_targeted_sender_AC ( self ):
- test = TargetedSenderTest ( self.A_addr, self.C_addr, "closest/01" )
+ name = 'test_01'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = TargetedSenderTest ( name, self.A_addr, self.C_addr, "closest/01" )
test.run()
self.assertEqual ( None, test.error )
def test_02_targeted_sender_DC ( self ):
- test = TargetedSenderTest ( self.D_addr, self.C_addr, "closest/02" )
+ name = 'test_02'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = TargetedSenderTest ( name, self.D_addr, self.C_addr, "closest/02" )
test.run()
self.assertEqual ( None, test.error )
def test_03_anonymous_sender_AC ( self ):
- test = AnonymousSenderTest ( self.A_addr, self.C_addr )
+ name = 'test_03'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = AnonymousSenderTest ( name, self.A_addr, self.C_addr )
test.run()
self.assertEqual ( None, test.error )
def test_04_anonymous_sender_DC ( self ):
- test = AnonymousSenderTest ( self.D_addr, self.C_addr )
+ name = 'test_04'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = AnonymousSenderTest ( name, self.D_addr, self.C_addr )
test.run()
self.assertEqual ( None, test.error )
def test_05_dynamic_reply_to_AC ( self ):
- test = DynamicReplyTo ( self.A_addr, self.C_addr )
+ name = 'test_05'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = DynamicReplyTo ( name, self.A_addr, self.C_addr )
test.run()
self.assertEqual ( None, test.error )
def test_06_dynamic_reply_to_DC ( self ):
- test = DynamicReplyTo ( self.D_addr, self.C_addr )
+ name = 'test_06'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = DynamicReplyTo ( name, self.D_addr, self.C_addr )
test.run()
self.assertEqual ( None, test.error )
def test_07_linkroute ( self ):
- test = LinkAttachRouting ( self.C_addr,
+ name = 'test_07'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = LinkAttachRouting ( name,
+ self.container_ids[0],
+ self.C_addr,
self.A_route_container_addr,
self.linkroute_prefix,
"addr_07"
@@ -465,7 +602,12 @@ class DistributionTests ( TestCase ):
def test_08_linkroute_check_only ( self ):
- test = LinkAttachRoutingCheckOnly ( self.C_addr,
+ name = 'test_08'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = LinkAttachRoutingCheckOnly ( name,
+ self.container_ids[0],
+ self.C_addr,
self.A_route_container_addr,
self.linkroute_prefix,
"addr_08"
@@ -475,7 +617,11 @@ class DistributionTests ( TestCase ):
def test_09_closest_linear ( self ):
- test = ClosestTest ( self.A_addr,
+ name = 'test_09'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = ClosestTest ( name,
+ self.A_addr,
self.B_addr,
self.C_addr,
"addr_09"
@@ -485,7 +631,11 @@ class DistributionTests ( TestCase ):
def test_10_closest_mesh ( self ):
- test = ClosestTest ( self.A_addr,
+ name = 'test_10'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = ClosestTest ( name,
+ self.A_addr,
self.B_addr,
self.D_addr,
"addr_10"
@@ -562,6 +712,9 @@ class DistributionTests ( TestCase ):
#
def test_11_balanced_linear ( self ):
+ name = 'test_11'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
# slop is how much the second two values may diverge from
# the expected. But they still must sum to total - A.
total = 100
@@ -575,7 +728,8 @@ class DistributionTests ( TestCase ):
slop = 1
omit_middle_receiver = False
- test = BalancedTest ( self.A_addr,
+ test = BalancedTest ( name,
+ self.A_addr,
self.B_addr,
self.C_addr,
"addr_11",
@@ -591,6 +745,9 @@ class DistributionTests ( TestCase ):
def test_12_balanced_linear_omit_middle_receiver ( self ):
+ name = 'test_12'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
# If we omit the middle receiver, then router A will count
# up to cost ( A, B ) and the keep counting up a further
# cost ( B, C ) before it starts to spill over.
@@ -612,7 +769,8 @@ class DistributionTests ( TestCase ):
slop = 1
omit_middle_receiver = True
- test = BalancedTest ( self.A_addr,
+ test = BalancedTest ( name,
+ self.A_addr,
self.B_addr,
self.C_addr,
"addr_12",
@@ -684,13 +842,17 @@ class DistributionTests ( TestCase ):
# 3. B and D are both with 1 of their expected values.
#
def test_13_balanced_mesh ( self ):
+ name = 'test_13'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
total = 100
expected_A = 54
expected_B = 43
expected_D = 3
slop = 1
omit_middle_receiver = False
- test = BalancedTest ( self.A_addr,
+ test = BalancedTest ( name,
+ self.A_addr,
self.B_addr,
self.D_addr,
"addr_13",
@@ -706,7 +868,11 @@ class DistributionTests ( TestCase ):
def test_14_multicast_linear ( self ):
- test = MulticastTest ( self.A_addr,
+ name = 'test_14'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = MulticastTest ( name,
+ self.A_addr,
self.B_addr,
self.C_addr,
"addr_14"
@@ -716,7 +882,11 @@ class DistributionTests ( TestCase ):
def test_15_multicast_mesh ( self ):
- test = MulticastTest ( self.A_addr,
+ name = 'test_15'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = MulticastTest ( name,
+ self.A_addr,
self.B_addr,
self.D_addr,
"addr_15"
@@ -726,6 +896,9 @@ class DistributionTests ( TestCase ):
def test_16_linkroute_linear_all_local ( self ) :
+ name = 'test_16'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
"""
This test should route all senders' link-attaches
to the local containers on router A.
@@ -796,15 +969,16 @@ class DistributionTests ( TestCase ):
}
]
- test = RoutingTest ( self.A_addr, # all senders are attached here
+ test = RoutingTest ( name,
+ self.container_ids[0],
+ self.A_addr, # all senders are attached here
routers,
self.linkroute_prefix,
addr_suffix,
instructions,
where_to_make_connections,
n_local_containers,
- n_remote_routers,
- "Test 16"
+ n_remote_routers
)
test.run ( )
self.assertEqual ( None, test.error )
@@ -812,6 +986,9 @@ class DistributionTests ( TestCase ):
def test_17_linkroute_linear_all_B ( self ) :
+ name = 'test_17'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
"""
This test should route all senders' link-attaches
to the remote connections on router B.
@@ -882,15 +1059,16 @@ class DistributionTests ( TestCase ):
}
]
- test = RoutingTest ( self.A_addr, # all senders are attached here
+ test = RoutingTest ( name,
+ self.container_ids[0],
+ self.A_addr, # all senders are attached here
routers,
self.linkroute_prefix,
addr_suffix,
instructions,
where_to_make_connections,
n_local_containers,
- n_remote_routers,
- "Test 17"
+ n_remote_routers
)
test.run ( )
self.assertEqual ( None, test.error )
@@ -898,6 +1076,9 @@ class DistributionTests ( TestCase ):
def test_18_linkroute_linear_all_C ( self ) :
+ name = 'test_18'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
"""
This test should route all senders' link-attaches
to the remote connections on router C.
@@ -968,21 +1149,25 @@ class DistributionTests ( TestCase ):
}
]
- test = RoutingTest ( self.A_addr, # all senders are attached here
+ test = RoutingTest ( name,
+ self.container_ids[0],
+ self.A_addr, # all senders are attached here
routers,
self.linkroute_prefix,
addr_suffix,
instructions,
where_to_make_connections,
n_local_containers,
- n_remote_routers,
- "Test 18"
+ n_remote_routers
)
test.run ( )
self.assertEqual ( None, test.error )
def test_19_linkroute_linear_kill ( self ) :
+ name = 'test_19'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
"""
Start out as usual, making four senders and seeing their link-attaches
routed to router A (local). But then kill the two route-container
@@ -1113,15 +1298,16 @@ class DistributionTests ( TestCase ):
}
]
- test = RoutingTest ( self.A_addr, # all senders are attached here
+ test = RoutingTest ( name,
+ self.container_ids[0],
+ self.A_addr, # all senders are attached here
routers,
self.linkroute_prefix,
addr_suffix,
instructions,
where_to_make_connections,
n_local_containers,
- n_remote_routers,
- "Test 19"
+ n_remote_routers
)
test.run ( )
self.assertEqual ( None, test.error )
@@ -1129,6 +1315,9 @@ class DistributionTests ( TestCase ):
def test_20_linkroute_mesh_all_local ( self ) :
+ name = 'test_20'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
"""
c c
senders ---> A --------- B
@@ -1213,21 +1402,25 @@ class DistributionTests ( TestCase ):
}
]
- test = RoutingTest ( self.A_addr, # all senders are attached here
+ test = RoutingTest ( name,
+ self.container_ids[0],
+ self.A_addr, # all senders are attached here
routers,
self.linkroute_prefix,
addr_suffix,
instructions,
where_to_make_connections,
n_local_containers,
- n_remote_routers,
- "Test 20"
+ n_remote_routers
)
test.run ( )
self.assertEqual ( None, test.error )
def test_21_linkroute_mesh_nonlocal ( self ) :
+ name = 'test_21'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
"""
c
senders ---> A --------- B
@@ -1312,15 +1505,16 @@ class DistributionTests ( TestCase ):
}
]
- test = RoutingTest ( self.A_addr, # all senders are attached here
+ test = RoutingTest ( name,
+ self.container_ids[0],
+ self.A_addr, # all senders are attached here
routers,
self.linkroute_prefix,
addr_suffix,
instructions,
where_to_make_connections,
n_local_containers,
- n_remote_routers,
- "Test 21"
+ n_remote_routers
)
test.run ( )
self.assertEqual ( None, test.error )
@@ -1329,6 +1523,9 @@ class DistributionTests ( TestCase ):
def test_22_linkroute_mesh_kill ( self ) :
+ name = 'test_22'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
"""
c c
senders ---> A --------- B
@@ -1468,22 +1665,28 @@ class DistributionTests ( TestCase ):
}
]
- test = RoutingTest ( self.A_addr, # all senders are attached here
+ test = RoutingTest ( name,
+ self.container_ids[0],
+ self.A_addr, # all senders are attached here
routers,
self.linkroute_prefix,
addr_suffix,
instructions,
where_to_make_connections,
n_local_containers,
- n_remote_routers,
- "Test 22"
+ n_remote_routers
)
test.run ( )
self.assertEqual ( None, test.error )
def test_23_waypoint ( self ) :
- test = WaypointTest ( self.A_addr,
+ name = 'test_23'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = WaypointTest ( name,
+ self.container_ids[1],
+ self.A_addr,
self.B_addr,
self.C_route_container_addr,
self.waypoint_prefix_1 + '.waypoint'
@@ -1493,7 +1696,12 @@ class DistributionTests ( TestCase ):
def test_24_serial_waypoint_test ( self ):
- test = SerialWaypointTest ( self.A_addr,
+ name = 'test_24'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = SerialWaypointTest ( name,
+ self.container_ids[2],
+ self.A_addr,
self.B_addr,
self.C_route_container_addr,
self.waypoint_prefix_2 + '.waypoint'
@@ -1501,7 +1709,19 @@ class DistributionTests ( TestCase ):
test.run()
self.assertEqual(None, test.error)
-
+ def test_25_parallel_waypoint_test ( self ):
+ name = 'test_25'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = ParallelWaypointTest ( name,
+ self.container_ids[3],
+ self.A_addr,
+ self.B_addr,
+ self.C_route_container_addr,
+ self.waypoint_prefix_3 + '.waypoint'
+ )
+ test.run()
+ self.assertEqual(None, test.error)
@@ -1517,7 +1737,7 @@ class TargetedSenderTest ( MessagingHandler ):
address we want to send to. (As opposed to letting the router
pass back an address to us.)
"""
- def __init__ ( self, send_addr, recv_addr, destination ):
+ def __init__ ( self, test_name, send_addr, recv_addr, destination ):
super(TargetedSenderTest, self).__init__(prefetch=0)
self.send_addr = send_addr
self.recv_addr = recv_addr
@@ -1529,6 +1749,7 @@ class TargetedSenderTest ( MessagingHandler ):
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
+ self.test_name = test_name
def timeout(self):
@@ -1592,7 +1813,7 @@ class AnonymousSenderTest ( MessagingHandler ):
information to us when we get the on_link_opened event.
"""
- def __init__(self, send_addr, recv_addr):
+ def __init__(self, test_name, send_addr, recv_addr):
super(AnonymousSenderTest, self).__init__()
self.send_addr = send_addr
self.recv_addr = recv_addr
@@ -1608,6 +1829,7 @@ class AnonymousSenderTest ( MessagingHandler ):
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
+ self.test_name = test_name
def timeout ( self ):
@@ -1673,7 +1895,7 @@ class DynamicReplyTo(MessagingHandler):
the expected number of replies, or with failure if we time out before
that happens.
"""
- def __init__(self, client_addr, server_addr):
+ def __init__(self, test_name, client_addr, server_addr):
super(DynamicReplyTo, self).__init__(prefetch=10)
self.client_addr = client_addr
self.server_addr = server_addr
@@ -1687,6 +1909,7 @@ class DynamicReplyTo(MessagingHandler):
self.n_sent = 0
self.received_by_server = 0
self.received_by_client = 0
+ self.test_name = test_name
def timeout(self):
@@ -1752,7 +1975,14 @@ class DynamicReplyTo(MessagingHandler):
class LinkAttachRoutingCheckOnly ( MessagingHandler ):
"""
"""
- def __init__ ( self, client_host, linkroute_container_host, linkroute_prefix, addr_suffix ):
+ def __init__ ( self,
+ test_name,
+ container_id,
+ client_host,
+ linkroute_container_host,
+ linkroute_prefix,
+ addr_suffix
+ ):
super ( LinkAttachRoutingCheckOnly, self ).__init__(prefetch=0)
self.client_host = client_host
self.linkroute_container_host = linkroute_container_host
@@ -1766,6 +1996,8 @@ class LinkAttachRoutingCheckOnly ( MessagingHandler ):
self.linkroute_check_timer = None
self.linkroute_check_receiver = None
self.linkroute_check_sender = None
+ self.test_name = test_name
+ self.container_id = container_id
self.debug = False
@@ -1776,8 +2008,7 @@ class LinkAttachRoutingCheckOnly ( MessagingHandler ):
def timeout ( self ):
- self.bail ( "Timeout Expired: n_sent=%d n_rcvd=%d n_settled=%d" %
- (self.n_sent, self.n_rcvd, self.n_settled) )
+ self.bail ( "Timeout Expired" )
def address_check_timeout(self):
@@ -1816,7 +2047,7 @@ class LinkAttachRoutingCheckOnly ( MessagingHandler ):
if event.receiver == self.linkroute_check_receiver:
event.receiver.flow(30)
# Because we created the linkroute_check_receiver 'dynamic', when it opens
- # it will have its address filled in. That is the address we want our
+ # it will have its address filled in. That is the address we want our
# AddressChecker replies to go to.
self.linkroute_checker = AddressChecker ( self.linkroute_check_receiver.remote_source.address )
self.linkroute_check()
@@ -1855,7 +2086,7 @@ class LinkAttachRoutingCheckOnly ( MessagingHandler ):
def run(self):
container = Container(self)
- container.container_id = 'LinkRouteTest'
+ container.container_id = self.container_id
container.run()
@@ -1867,7 +2098,14 @@ class LinkAttachRouting ( MessagingHandler ):
the route container will connect to, and it will receive our messages.
The near host is what our sender will attach to.
"""
- def __init__ ( self, nearside_host, farside_host, linkroute_prefix, addr_suffix ):
+ def __init__ ( self,
+ test_name,
+ container_id,
+ nearside_host,
+ farside_host,
+ linkroute_prefix,
+ addr_suffix
+ ):
super ( LinkAttachRouting, self ).__init__(prefetch=0)
self.nearside_host = nearside_host
self.farside_host = farside_host
@@ -1883,10 +2121,12 @@ class LinkAttachRouting ( MessagingHandler ):
self.linkroute_check_receiver = None
self.linkroute_check_sender = None
- self.count = 10
- self.n_sent = 0
- self.n_rcvd = 0
- self.n_settled = 0
+ self.count = 10
+ self.n_sent = 0
+ self.n_rcvd = 0
+ self.n_settled = 0
+ self.test_name = test_name
+ self.container_id = container_id
def timeout ( self ):
@@ -2003,11 +2243,13 @@ class LinkAttachRouting ( MessagingHandler ):
def run(self):
container = Container(self)
- container.container_id = 'LinkRouteTest'
+ container.container_id = self.container_id
container.run()
+
+
class ClosestTest ( MessagingHandler ):
"""
Test whether distance-based message routing works in a
@@ -2026,7 +2268,7 @@ class ClosestTest ( MessagingHandler ):
router_1, and then 2 receivers each on all 3 routers.
"""
- def __init__ ( self, router_1, router_2, router_3, addr_suffix ):
+ def __init__ ( self, test_name, router_1, router_2, router_3, addr_suffix ):
super ( ClosestTest, self ).__init__(prefetch=0)
self.error = None
self.router_1 = router_1
@@ -2053,7 +2295,8 @@ class ClosestTest ( MessagingHandler ):
self.addr_check_timer = None
self.addr_check_receiver = None
self.addr_check_sender = None
- self.bailed = False
+ self.bailed = False
+ self.test_name = test_name
def timeout ( self ):
self.bail ( "Timeout Expired " )
@@ -2224,7 +2467,19 @@ class BalancedTest ( MessagingHandler ):
( Slop can happen in some topologies when you can't tell whether spillover
will happen first to node 2, or to node 3.
"""
- def __init__ ( self, router_1, router_2, router_3, addr_suffix, total_messages, expected_1, expected_2, expected_3, slop, omit_middle_receiver ):
+ def __init__ ( self,
+ test_name,
+ router_1,
+ router_2,
+ router_3,
+ addr_suffix,
+ total_messages,
+ expected_1,
+ expected_2,
+ expected_3,
+ slop,
+ omit_middle_receiver
+ ):
super ( BalancedTest, self ).__init__(prefetch=0, auto_accept=False)
self.error = None
self.router_3 = router_3
@@ -2256,6 +2511,7 @@ class BalancedTest ( MessagingHandler ):
self.address_check_sender = None
self.payload_sender = None
+ self.test_name = test_name
def timeout ( self ):
@@ -2384,7 +2640,13 @@ class MulticastTest ( MessagingHandler ):
Using multicast, we should see all receivers get everything,
whether the topology is linear or mesh.
"""
- def __init__ ( self, router_1, router_2, router_3, addr_suffix ):
+ def __init__ ( self,
+ test_name,
+ router_1,
+ router_2,
+ router_3,
+ addr_suffix
+ ):
super ( MulticastTest, self ).__init__(prefetch=0)
self.error = None
self.router_1 = router_1
@@ -2409,7 +2671,8 @@ class MulticastTest ( MessagingHandler ):
self.addr_check_receiver = None
self.addr_check_sender = None
self.sender = None
- self.bailed = False
+ self.bailed = False
+ self.test_name = test_name
def timeout ( self ):
self.bail ( "Timeout Expired " )
@@ -2567,9 +2830,9 @@ class RoutingTest ( MessagingHandler ):
"""
# NOTE that no payload messages are sent in this test! I send some
- # management messages to see when the router network is ready for
- # me, but other than that, all I care about is the link-attaches
- # that happen each time I make a sender -- and where they are
+ # management messages to see when the router network is ready for
+ # me, but other than that, all I care about is the link-attaches
+ # that happen each time I make a sender -- and where they are
# routed to.
# NOTE about STEP comments: take a look at comments marked with the
@@ -2578,6 +2841,8 @@ class RoutingTest ( MessagingHandler ):
# list that is passed in from the caller.
def __init__ ( self,
+ test_name,
+ container_id,
sender_host,
route_container_addrs,
linkroute_prefix,
@@ -2585,8 +2850,7 @@ class RoutingTest ( MessagingHandler ):
instructions,
where_to_make_connections,
n_local_containers,
- n_remote_routers,
- test_name
+ n_remote_routers
):
super ( RoutingTest, self ).__init__(prefetch=0)
@@ -2643,6 +2907,7 @@ class RoutingTest ( MessagingHandler ):
self.sent_address_ready = False
self.status = 'start up'
+ self.container_id = container_id
def debug_print ( self, message ) :
@@ -3063,7 +3328,7 @@ class RoutingTest ( MessagingHandler ):
def run(self):
container = Container(self)
- container.container_id = 'LinkRouteTest'
+ container.container_id = self.container_id
container.run()
@@ -3073,14 +3338,21 @@ class RoutingTest ( MessagingHandler ):
class WaypointTest ( MessagingHandler ):
"""
- Messages from a client sender to a client receiver are first
+ Messages from a client sender to a client receiver are first
diverted out of the router into a separate waypoint receiver,
- which stores them in a fifo. This simulates reception by a
+ which stores them in a fifo. This simulates reception by a
broker or some other arbitrary process.
- The message then returns from the waypoint sender back to the
+ The message then returns from the waypoint sender back to the
router, and then arrives at the client receiver.
"""
- def __init__ ( self, client_host_1, client_host_2, route_container_host, destination ):
+ def __init__ ( self,
+ test_name,
+ container_id,
+ client_host_1,
+ client_host_2,
+ route_container_host,
+ destination
+ ):
super(WaypointTest, self).__init__()
self.client_host_1 = client_host_1
self.client_host_2 = client_host_2
@@ -3093,10 +3365,11 @@ class WaypointTest ( MessagingHandler ):
self.waypoint_sender = None
self.waypoint_receiver = None
self.waypoint_queue = []
+ self.container_id = container_id
self.messages_per_sender = 10
- self.senders = [
+ self.senders = [
{
'sender' : None,
'to_send' : 0,
@@ -3109,7 +3382,7 @@ class WaypointTest ( MessagingHandler ):
'n_sent' : 0
}
]
- self.receivers = [
+ self.receivers = [
{
'receiver' : None,
'n_received' : 0
@@ -3135,7 +3408,8 @@ class WaypointTest ( MessagingHandler ):
# the waypoint.
self.n_expected_transitions = self.messages_per_sender * self.n_senders * 4
- self.debug = False
+ self.debug = False
+ self.test_name = test_name
def timeout(self):
@@ -3191,9 +3465,11 @@ class WaypointTest ( MessagingHandler ):
def on_start ( self, event ):
self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) )
- self.client_connection = event.container.connect ( self.client_host_1 )
+ self.client_connection = event.container.connect ( self.client_host_1 )
- # Creating this connection is what gets things started.
+ # Creating this connection is what gets things started. When we make this
+ # connection to a route container address, the router will look at our
+ # containerId, and will at that time instantiate any associated autolinks.
self.route_container_connection = event.container.connect ( self.route_container_host )
self.debug_print ( " creating clients for connection" )
@@ -3201,14 +3477,14 @@ class WaypointTest ( MessagingHandler ):
sender = self.senders[i]
receiver = self.receivers[i]
- sender['sender'] = event.container.create_sender ( self.client_connection,
- self.destination,
+ sender['sender'] = event.container.create_sender ( self.client_connection,
+ self.destination,
name="sender_%d" % i)
sender['to_send'] = self.messages_per_sender
sender['n_sent'] = 0
- receiver['receiver'] = event.container.create_receiver ( self.client_connection,
- self.destination,
+ receiver['receiver'] = event.container.create_receiver ( self.client_connection,
+ self.destination,
name="receiver_%d" % i)
receiver['n_received'] = 0
@@ -3247,7 +3523,7 @@ class WaypointTest ( MessagingHandler ):
return
if event.sender == self.waypoint_sender :
self.send_from_waypoint ( )
-
+
def on_message ( self, event ):
self.debug_print ( "on_message ---------------------------- " )
@@ -3280,7 +3556,7 @@ class WaypointTest ( MessagingHandler ):
def run(self):
container = Container(self)
- container.container_id = 'WaypointTest'
+ container.container_id = self.container_id
container.run()
@@ -3289,32 +3565,40 @@ class WaypointTest ( MessagingHandler ):
class SerialWaypointTest ( MessagingHandler ):
"""
- Messages from a client sender on their way to a client receiver are
+ Messages from a client sender on their way to a client receiver are
first re-routed to two separate waypoint 'processes', in serial.
The waypoint processes are simulated in this test by separate 'waypoint'
receivers that store the messages in fifo lists, and separate 'waypoint'
- senders that pop them off the fifos and send them. This simulates
+ senders that pop them off the fifos and send them. This simulates
either a broker, or some arbitrary processing on the message.
"""
- def __init__ ( self, client_host_1, client_host_2, route_container_host, destination ):
+ def __init__ ( self,
+ test_name,
+ container_id,
+ client_host_1,
+ client_host_2,
+ route_container_host,
+ destination
+ ):
super(SerialWaypointTest, self).__init__()
self.client_host_1 = client_host_1
self.client_host_2 = client_host_2
self.route_container_host = route_container_host
self.destination = destination
+ self.sender_connections = []
+ self.error = None
+ self.messages_per_sender = 100
+ self.container_id = container_id
- self.sender_connections = []
self.route_container_connection = None
- self.error = None
- self.messages_per_sender = 100
# There are 2 sending clients and 2 receiving clients
- # only because I wanted to have more than 1, and 2
- # appeared to be the next available integer.
- # This has nothing to do with the fact that there are
+ # only because I wanted to have more than 1, and 2
+ # appeared to be the next available integer.
+ # This has nothing to do with the fact that there are
# 2 waypoints.
- self.senders = [
+ self.senders = [
{ 'sender' : None,
'to_send' : self.messages_per_sender,
'n_sent' : 0
@@ -3325,7 +3609,7 @@ class SerialWaypointTest ( MessagingHandler ):
}
]
- self.receivers = [
+ self.receivers = [
{ 'receiver' : None,
'n_received' : 0
},
@@ -3361,15 +3645,16 @@ class SerialWaypointTest ( MessagingHandler ):
self.n_thru = 0
self.n_transitions = 0
self.n_expected_received = self.messages_per_sender * len(self.senders)
-
- # Each message is sent from one client sender, and finally received
- # by one client receiver. In the meantime in goes into, and then
+
+ # Each message is sent from one client sender, and finally received
+ # by one client receiver. In the meantime in goes into, and then
# comes back out of, two separate waypoints. That's a total of
# six links -- or 'transitions' -- for each message.
n_links_per_message = 2 + 2 * len(self.waypoints)
self.n_expected_transitions = len(self.senders) * self.messages_per_sender * n_links_per_message
- self.debug = False
+ self.debug = False
+ self.test_name = test_name
def timeout(self):
@@ -3419,22 +3704,25 @@ class SerialWaypointTest ( MessagingHandler ):
self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) )
self.sender_connections.append ( event.container.connect(self.client_host_1) )
self.sender_connections.append ( event.container.connect(self.client_host_2) )
- # Creating this connection is what gets things started.
+ # Creating this connection is what gets things started. When we make this
+ # connection to a route container address, the router will look at our
+ # containerId, and will at that time instantiate any associated autolinks.
self.route_container_connection = event.container.connect ( self.route_container_host )
+
for i in range(len(self.sender_connections)) :
- cnx = self.sender_connections[i]
+ cnx = self.sender_connections[i]
sender = self.senders[i]
receiver = self.receivers[i]
- sender['sender'] = event.container.create_sender ( cnx,
- self.destination,
+ sender['sender'] = event.container.create_sender ( cnx,
+ self.destination,
name="sender_%d" % i)
sender['to_send'] = self.messages_per_sender
sender['n_sent'] = 0
-
- receiver['receiver'] = event.container.create_receiver ( cnx,
- self.destination,
+
+ receiver['receiver'] = event.container.create_receiver ( cnx,
+ self.destination,
name="receiver_%d" % i)
receiver['n_received'] = 0
@@ -3462,7 +3750,7 @@ class SerialWaypointTest ( MessagingHandler ):
if self.n_waypoint_receivers < 2 :
self.waypoints[self.n_waypoint_receivers]['receiver'] = event.receiver
self.n_waypoint_receivers += 1
-
+
def on_sendable ( self, event ):
@@ -3551,7 +3839,327 @@ class SerialWaypointTest ( MessagingHandler ):
if total_actual_waypoint_receptions != total_expected_waypoint_receptions :
self.bail ( "total waypoint receptions were %d, but %d were expected." % ( total_actual_waypoint_receptions, total_expected_waypoint_receptions) )
return
-
+
+ total_messages_received = 0
+ for i in range(len(self.receivers)) :
+ this_receiver_got = self.receivers[i]['n_received']
+ total_messages_received += this_receiver_got
+
+ if total_messages_received != total_messages_sent :
+ self.bail ( "total_messages_received: %d but %d were expected." % (total_messages_received, total_messages_sent) )
+ return
+
+ self.debug_print ( "\nsuccess\n" )
+ self.bail ( None )
+
+
+
+ def report ( self ) :
+ print "\n\n==========================================================\nreport\n"
+
+ for i in range(len(self.senders)) :
+ print " client sender %d sent %d messages." % ( i, self.senders[i]['n_sent'] )
+
+ print "\n"
+
+ for i in range(len(self.waypoints)) :
+ print " waypoint %d received %d messages." % ( i, self.waypoints[i]['n_received'] )
+ print " waypoint %d sent %d messages." % ( i, self.waypoints[i]['n_sent'] )
+
+ print "\n"
+
+ for i in range(len(self.receivers)) :
+ print " client receiver %d received %d messages." % ( i, self.receivers[i]['n_received'] )
+
+ print "\nend report\n=========================================================\n\n"
+
+
+
+ def run(self):
+ container = Container(self)
+ container.container_id = self.container_id
+ container.run()
+
+
+
+
+class ParallelWaypointTest ( MessagingHandler ):
+ """
+ Messages from a client sender on their way to a client receiver are
+ first re-routed to one of two separate waypoint 'processes', in parallel.
+ The waypoint processes are simulated in this test by separate 'waypoint'
+ receivers that store the messages in fifo lists, and separate 'waypoint'
+ senders that pop them off the fifos and send them. This simulates
+ either a broker, or some arbitrary processing on the message.
+ """
+ def __init__ ( self,
+ test_name,
+ container_id,
+ client_host_1,
+ client_host_2,
+ route_container_host,
+ destination
+ ):
+ super ( ParallelWaypointTest, self ). __init__()
+ self.client_host_1 = client_host_1
+ self.client_host_2 = client_host_2
+ self.route_container_host = route_container_host
+ self.destination = destination
+ self.sender_connections = []
+ self.error = None
+ self.messages_per_sender = 100
+ self.container_id = container_id
+
+ self.route_container_connection = None
+
+ self.senders = [
+ { 'sender' : None,
+ 'to_send' : self.messages_per_sender,
+ 'n_sent' : 0
+ },
+ { 'sender' : None,
+ 'to_send' : self.messages_per_sender,
+ 'n_sent' : 0
+ }
+ ]
+
+ self.receivers = [
+ { 'receiver' : None,
+ 'n_received' : 0
+ },
+ { 'receiver' : None,
+ 'n_received' : 0
+ }
+ ]
+
+ self.n_waypoint_senders = 0
+ self.n_waypoint_receivers = 0
+
+ self.waypoints = [
+ { 'sender' : None,
+ 'n_sent' : 0,
+ 'receiver' : None,
+ 'n_received' : 0,
+ 'queue' : [],
+ 'n_sent' : 0,
+ 'name' : '1'
+ },
+ { 'sender' : None,
+ 'n_sent' : 0,
+ 'receiver' : None,
+ 'n_received' : 0,
+ 'queue' : [],
+ 'n_sent' : 0,
+ 'name' : '2'
+ }
+ ]
+
+ self.n_sent = 0
+ self.n_rcvd = 0
+ self.n_thru = 0
+ self.n_transitions = 0
+ self.n_expected_received = self.messages_per_sender * len(self.senders)
+
+ # Each message is sent from one client sender, and finally received
+ # by one client receiver. In the meantime in goes into, and then
+ # comes back out of, ONE waypoint. That's a total of
+ # four links -- or 'transitions' -- for each message.
+ n_links_per_message = 4
+ self.n_expected_transitions = len(self.senders) * self.messages_per_sender * n_links_per_message
+
+ self.debug = False
+
+ self.test_name = test_name
+
+
+ def timeout(self):
+ self.bail ( "Timeout Expired: n_sent=%d n_rcvd=%d n_thru=%d" % (self.n_sent, self.n_rcvd, self.n_thru) )
+
+
+ def bail ( self, text ):
+ self.error = text
+ self.route_container_connection.close()
+ for cnx in self.sender_connections :
+ cnx.close()
+ self.timer.cancel()
+
+
+ def debug_print ( self, message ) :
+ if self.debug :
+ print message
+
+
+ def send_from_client ( self, sender, n_messages, sender_index ):
+ n_sent = 0
+ while sender.credit > 0 and n_sent < n_messages:
+ msg = Message ( body=n_sent )
+ sender.send ( msg )
+ n_sent += 1
+ self.n_sent += 1
+ self.n_transitions += 1
+ self.debug_print ( "send_from_client -- sender: %d n_sent: %d" % ( sender_index, n_sent ) )
+
+
+
+ def send_from_waypoint ( self, waypoint ):
+ self.debug_print ( "send_from_waypoint ------------------------------" )
+
+ while waypoint['sender'].credit > 0 and len(waypoint['queue']) > 0:
+ m = waypoint['queue'].pop()
+ message_content_number = m.body
+ waypoint['sender'].send ( m )
+ waypoint['n_sent'] += 1
+ self.n_thru += 1
+ self.n_transitions += 1
+ self.debug_print ( "send_from_waypoint %s is %d " % ( waypoint['name'], message_content_number) )
+
+
+
+ def on_start ( self, event ):
+ self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) )
+ self.sender_connections.append ( event.container.connect(self.client_host_1) )
+ self.sender_connections.append ( event.container.connect(self.client_host_2) )
+ # Creating this connection is what gets things started. When we make this
+ # connection to a route container address, the router will look at our
+ # containerId, and will at that time instantiate any associated autolinks.
+ # We will get an 'on_link_opening' for each of them.
+ self.route_container_connection = event.container.connect ( self.route_container_host )
+
+ for i in range(len(self.sender_connections)) :
+ cnx = self.sender_connections[i]
+ sender = self.senders[i]
+ receiver = self.receivers[i]
+
+ sender['sender'] = event.container.create_sender ( cnx,
+ self.destination,
+ name="sender_%d" % i)
+ sender['to_send'] = self.messages_per_sender
+ sender['n_sent'] = 0
+ receiver['receiver'] = event.container.create_receiver ( cnx,
+ self.destination,
+ name="receiver_%d" % i)
+ receiver['n_received'] = 0
+
+
+ def on_link_opening ( self, event ):
+
+ self.debug_print ( "on_link_opening -------------------------- " )
+
+ if event.sender:
+ self.debug_print ( " sender: %s" % event.sender.remote_source.address )
+ event.sender.source.address = event.sender.remote_source.address
+ event.sender.open()
+ if event.sender.remote_source.address == self.destination:
+ if self.n_waypoint_senders < 2 :
+ self.debug_print ( " store this as one of my waypoint senders." )
+ self.waypoints[self.n_waypoint_senders]['sender'] = event.sender
+ self.n_waypoint_senders += 1
+
+ elif event.receiver:
+ self.debug_print ( " receiver: %s" % event.receiver.remote_target.address )
+ event.receiver.target.address = event.receiver.remote_target.address
+ event.receiver.open()
+ if event.receiver.remote_target.address == self.destination:
+ self.debug_print ( " store this as one of my waypoint receivers." )
+ if self.n_waypoint_receivers < 2 :
+ self.waypoints[self.n_waypoint_receivers]['receiver'] = event.receiver
+ self.n_waypoint_receivers += 1
+
+
+
+ def on_sendable ( self, event ):
+ self.debug_print ( "on_sendable ------------------------------" )
+ for index in range(len(self.senders)) :
+ sender = self.senders[index]
+ if event.sender == sender['sender'] :
+ self.debug_print ( " client sender %d" % index )
+ if sender['n_sent'] < sender['to_send'] :
+ self.debug_print ( " sending %d" % sender['to_send'] )
+ self.send_from_client ( sender['sender'], sender['to_send'], index )
+ sender['n_sent'] = sender['to_send'] # n_sent = n_to_send
+ else :
+ self.debug_print ( " this sender is already finished." )
+ return
+
+ for j in range(len(self.waypoints)) :
+ sender = self.waypoints[j]['sender']
+ if event.sender == sender :
+ self.debug_print ( " waypoint_sender %d" % j )
+ self.send_from_waypoint ( self.waypoints[j] )
+ return
+
+
+ def on_message(self, event):
+
+ self.debug_print ( "on_message --------------------------- " )
+
+ # Is this one of our client receivers ?
+ for i in range(len(self.receivers)) :
+ receiver = self.receivers[i]
+ if event.receiver == receiver['receiver'] :
+ receiver['n_received'] += 1
+ self.n_transitions += 1
+ self.debug_print (" client receiver %d has %d messages." % ( i, receiver['n_received'] ) )
+ message_content_number = event.message.body
+ self.n_rcvd += 1
+ if self.n_rcvd >= self.n_expected_received and self.n_thru >= self.n_expected_received:
+ self.debug_print ( "DONE -- self.n_rcvd: %d self.n_thru: %d" % ( self.n_rcvd, self.n_thru ) )
+ if self.debug :
+ self.report ( )
+ self.check_results_and_bail ( )
+ return
+
+ # Is this one of our waypoint receivers ?
+ for j in range(len(self.waypoints)) :
+ waypoint = self.waypoints[j]
+ if event.receiver == waypoint['receiver'] :
+ m = Message ( body=event.message.body )
+ waypoint [ 'queue' ].append ( m )
+ waypoint [ 'n_received' ] += 1
+ self.n_transitions += 1
+ self.debug_print ( " message received at waypoint %d, queue depth is now %d" % (j, len(waypoint['queue'])))
+ self.send_from_waypoint ( waypoint )
+
+
+
+ def check_results_and_bail ( self ) :
+
+ if self.n_expected_transitions != self.n_transitions :
+ self.bail ( "total transitions were %d, but %d were expected." % ( self.n_transitions, self.n_expected_transitions ) )
+ return
+
+ mps = self.messages_per_sender
+ n_senders = len(self.senders)
+ total_messages_sent = mps * n_senders
+
+ # For total messages sent, the expected value and
+ # the actual value must be the same. The two receivers
+ # may receive different numbers (although the total should
+ # be correct) but each of the senders must send the expected
+ # number of messages or something is wrong.
+ for i in range(n_senders) :
+ sndr = self.senders[i]
+ if sndr['n_sent'] != mps :
+ self.bail ( "sender %d sent %d messages instead of %d" % ( i, sndr['n_sent'], mps ) )
+ return
+
+ n_waypoints = len(self.waypoints)
+
+ # In this test, each message only hits one waypoint, not both.
+ # So the expected number of waypoint receptions is the same
+ # as the total number of messages sent.
+ total_expected_waypoint_receptions = total_messages_sent
+ total_actual_waypoint_receptions = 0
+
+ for i in range(n_waypoints) :
+ total_actual_waypoint_receptions += self.waypoints[i]['n_received']
+
+ if total_actual_waypoint_receptions != total_expected_waypoint_receptions :
+ self.bail ( "total waypoint receptions were %d, but %d were expected." % ( total_actual_waypoint_receptions, total_expected_waypoint_receptions) )
+ return
+
+ # Finally, our client receivers must receiv one message
+ # for every one that was originally sent out by the client senders.
total_messages_received = 0
for i in range(len(self.receivers)) :
this_receiver_got = self.receivers[i]['n_received']
@@ -3562,10 +4170,8 @@ class SerialWaypointTest ( MessagingHandler ):
return
self.debug_print ( "\nsuccess\n" )
- self.bail ( None )
-
+ self.bail ( None )
-
def report ( self ) :
@@ -3588,9 +4194,10 @@ class SerialWaypointTest ( MessagingHandler ):
print "\nend report\n=========================================================\n\n"
+
def run(self):
container = Container(self)
- container.container_id = 'WaypointTest2'
+ container.container_id = self.container_id
container.run()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org