You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2018/03/07 08:38:30 UTC
qpid-dispatch git commit: DISPATCH-209 : test disposition guarantee
with spurious connection loss
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 7e16c4e87 -> 35b1c3f83
DISPATCH-209 : test disposition guarantee with spurious connection loss
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/35b1c3f8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/35b1c3f8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/35b1c3f8
Branch: refs/heads/master
Commit: 35b1c3f83427e4fefa19a701650f0b2a8f49185a
Parents: 7e16c4e
Author: mgoulish <mg...@redhat.com>
Authored: Wed Mar 7 03:36:02 2018 -0500
Committer: mgoulish <mg...@redhat.com>
Committed: Wed Mar 7 03:36:02 2018 -0500
----------------------------------------------------------------------
tests/system_tests_topology_disposition.py | 440 +++++++++++++++++++-----
1 file changed, 351 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/35b1c3f8/tests/system_tests_topology_disposition.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_topology_disposition.py b/tests/system_tests_topology_disposition.py
index 03812ea..ba15009 100644
--- a/tests/system_tests_topology_disposition.py
+++ b/tests/system_tests_topology_disposition.py
@@ -42,9 +42,19 @@ except ImportError:
-#------------------------------------------------
+#================================================
# Helper classes for all tests.
-#------------------------------------------------
+#================================================
+
+class Stopwatch ( object ) :
+
+ def __init__ ( self, name, timer, initial_time, repeat_time ) :
+ self.name = name
+ self.timer = timer
+ self.initial_time = initial_time
+ self.repeat_time = repeat_time
+
+
class Timeout(object):
"""
@@ -78,11 +88,11 @@ class ManagementMessageHelper ( object ) :
return msg
def make_router_link_query ( self ) :
- props = { 'count': '100',
- 'operation': 'QUERY',
- 'entityType': 'org.apache.qpid.dispatch.router.link',
- 'name': 'self',
- 'type': 'org.amqp.management'
+ props = { 'count': '100',
+ 'operation': 'QUERY',
+ 'entityType': 'org.apache.qpid.dispatch.router.link',
+ 'name': 'self',
+ 'type': 'org.amqp.management'
}
attrs = []
attrs.append ( unicode('linkType') )
@@ -102,9 +112,10 @@ class ManagementMessageHelper ( object ) :
return Message ( body=msg_body, properties=props, reply_to=self.reply_addr )
-#------------------------------------------------
+
+#================================================
# END Helper classes for all tests.
-#------------------------------------------------
+#================================================
@@ -114,13 +125,23 @@ class ManagementMessageHelper ( object ) :
# Setup
#================================================================
+
class TopologyDispositionTests ( TestCase ):
+ """
+ The disposition guarantee is that the sender should shortly know
+ how its messages have been disposed: whether they have been
+ accepted, released, or modified.
+ These tests ensure that the disposition guarantee survives
+ disruptions in router network topology.
+ """
@classmethod
def setUpClass(cls):
super(TopologyDispositionTests, cls).setUpClass()
+ cls.routers = []
+
def router(name, more_config):
@@ -135,16 +156,17 @@ class TopologyDispositionTests ( TestCase ):
cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
- cls.routers = []
- A_client_port = cls.tester.get_port()
- B_client_port = cls.tester.get_port()
- C_client_port = cls.tester.get_port()
- D_client_port = cls.tester.get_port()
+ client_ports = dict()
+ client_ports [ 'A' ] = cls.tester.get_port()
+ client_ports [ 'B' ] = cls.tester.get_port()
+ client_ports [ 'C' ] = cls.tester.get_port()
+ client_ports [ 'D' ] = cls.tester.get_port()
- A_inter_router_port = cls.tester.get_port()
- B_inter_router_port = cls.tester.get_port()
- C_inter_router_port = cls.tester.get_port()
+ inter_router_ports = dict()
+ inter_router_ports [ 'A' ] = cls.tester.get_port()
+ inter_router_ports [ 'B' ] = cls.tester.get_port()
+ inter_router_ports [ 'C' ] = cls.tester.get_port()
#
#
@@ -164,12 +186,17 @@ class TopologyDispositionTests ( TestCase ):
# 1
#
- cls.A_B_cost = 100
- cls.A_C_cost = 50
- cls.A_D_cost = 1
- cls.B_C_cost = 1
- cls.B_D_cost = 20
- cls.C_D_cost = 1
+ cls.cost = dict()
+ cls.cost [ 'AB' ] = 100
+ cls.cost [ 'AC' ] = 50
+ cls.cost [ 'AD' ] = 1
+ cls.cost [ 'BC' ] = 1
+ cls.cost [ 'BD' ] = 20
+ cls.cost [ 'CD' ] = 1
+
+ # Add an extra, high-cost connection between A and D.
+ # This will be deleted in the first test.
+ cls.cost [ 'AD2' ] = 11
client_link_capacity = 1000
inter_router_link_capacity = 1000
@@ -177,7 +204,7 @@ class TopologyDispositionTests ( TestCase ):
router ( 'A',
[
( 'listener',
- { 'port': A_client_port,
+ { 'port': client_ports['A'],
'role': 'normal',
'stripAnnotations': 'no',
'linkCapacity' : client_link_capacity
@@ -185,7 +212,7 @@ class TopologyDispositionTests ( TestCase ):
),
( 'listener',
{ 'role': 'inter-router',
- 'port': A_inter_router_port,
+ 'port': inter_router_ports [ 'A' ],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -197,7 +224,7 @@ class TopologyDispositionTests ( TestCase ):
router ( 'B',
[
( 'listener',
- { 'port': B_client_port,
+ { 'port': client_ports['B'],
'role': 'normal',
'stripAnnotations': 'no',
'linkCapacity' : client_link_capacity
@@ -205,7 +232,7 @@ class TopologyDispositionTests ( TestCase ):
),
( 'listener',
{ 'role': 'inter-router',
- 'port': B_inter_router_port,
+ 'port': inter_router_ports [ 'B' ],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -214,9 +241,9 @@ class TopologyDispositionTests ( TestCase ):
( 'connector',
{ 'name': 'AB_connector',
'role': 'inter-router',
- 'port': A_inter_router_port,
+ 'port': inter_router_ports [ 'A' ],
'verifyHostName': 'no',
- 'cost': cls.A_B_cost,
+ 'cost': cls.cost['AB'],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -228,7 +255,7 @@ class TopologyDispositionTests ( TestCase ):
router ( 'C',
[
( 'listener',
- { 'port': C_client_port,
+ { 'port': client_ports['C'],
'role': 'normal',
'stripAnnotations': 'no',
'linkCapacity' : client_link_capacity
@@ -236,7 +263,7 @@ class TopologyDispositionTests ( TestCase ):
),
( 'listener',
{ 'role': 'inter-router',
- 'port': C_inter_router_port,
+ 'port': inter_router_ports [ 'C' ],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -244,9 +271,9 @@ class TopologyDispositionTests ( TestCase ):
( 'connector',
{ 'name': 'AC_connector',
'role': 'inter-router',
- 'port': A_inter_router_port,
+ 'port': inter_router_ports [ 'A' ],
'verifyHostName': 'no',
- 'cost' : cls.A_C_cost,
+ 'cost' : cls.cost['AC'],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -254,9 +281,9 @@ class TopologyDispositionTests ( TestCase ):
( 'connector',
{ 'name': 'BC_connector',
'role': 'inter-router',
- 'port': B_inter_router_port,
+ 'port': inter_router_ports [ 'B' ],
'verifyHostName': 'no',
- 'cost' : cls.B_C_cost,
+ 'cost' : cls.cost['BC'],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -268,7 +295,7 @@ class TopologyDispositionTests ( TestCase ):
router ( 'D',
[
( 'listener',
- { 'port': D_client_port,
+ { 'port': client_ports['D'],
'role': 'normal',
'stripAnnotations': 'no',
'linkCapacity' : client_link_capacity
@@ -277,9 +304,19 @@ class TopologyDispositionTests ( TestCase ):
( 'connector',
{ 'name': 'AD_connector',
'role': 'inter-router',
- 'port': A_inter_router_port,
+ 'port': inter_router_ports [ 'A' ],
+ 'verifyHostName': 'no',
+ 'cost' : cls.cost['AD'],
+ 'stripAnnotations': 'no',
+ 'linkCapacity' : inter_router_link_capacity
+ }
+ ),
+ ( 'connector',
+ { 'name': 'AD2_connector',
+ 'role': 'inter-router',
+ 'port': inter_router_ports [ 'A' ],
'verifyHostName': 'no',
- 'cost' : cls.A_D_cost,
+ 'cost' : cls.cost['AD2'],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -287,9 +324,9 @@ class TopologyDispositionTests ( TestCase ):
( 'connector',
{ 'name': 'BD_connector',
'role': 'inter-router',
- 'port': B_inter_router_port,
+ 'port': inter_router_ports [ 'B' ],
'verifyHostName': 'no',
- 'cost' : cls.B_D_cost,
+ 'cost' : cls.cost['BD'],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -297,9 +334,9 @@ class TopologyDispositionTests ( TestCase ):
( 'connector',
{ 'name': 'CD_connector',
'role': 'inter-router',
- 'port': C_inter_router_port,
+ 'port': inter_router_ports [ 'C' ],
'verifyHostName': 'no',
- 'cost' : cls.C_D_cost,
+ 'cost' : cls.cost['CD'],
'stripAnnotations': 'no',
'linkCapacity' : inter_router_link_capacity
}
@@ -317,60 +354,285 @@ class TopologyDispositionTests ( TestCase ):
router_A.wait_router_connected('C')
router_A.wait_router_connected('D')
- cls.client_addrs = ( router_A.addresses[0],
- router_B.addresses[0],
- router_C.addresses[0],
- router_D.addresses[0]
- )
+ cls.client_addrs = dict()
+ cls.client_addrs['A'] = router_A.addresses[0]
+ cls.client_addrs['B'] = router_B.addresses[0]
+ cls.client_addrs['C'] = router_C.addresses[0]
+ cls.client_addrs['D'] = router_D.addresses[0]
# 1 means skip that test.
- cls.skip = { 'test_01' : 0
+ cls.skip = { 'test_01' : 0,
+ 'test_02' : 0
}
-
- def test_01_topology_disposition ( self ):
+ def test_01_delete_spurious_connector ( self ):
name = 'test_01'
if self.skip [ name ] :
self.skipTest ( "Test skipped during development." )
+ test = DeleteSpuriousConnector ( name,
+ self.client_addrs,
+ 'closest/01',
+ self.client_addrs['D']
+ )
+ test.run()
+ self.assertEqual ( None, test.error )
+
+
+ def test_02_topology_disposition ( self ):
+ name = 'test_02'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
test = TopologyDisposition ( name,
self.client_addrs,
- "closest/01"
+ "closest/02"
)
test.run()
self.assertEqual ( None, test.error )
-#================================================================
+
+#################################################################
# Tests
-#================================================================
+#################################################################
+
+
+class DeleteSpuriousConnector ( MessagingHandler ):
+ """
+ """
+ def __init__ ( self, test_name, client_addrs, destination, D_client_addr ):
+ super ( DeleteSpuriousConnector, self).__init__(prefetch=100)
+ self.test_name = test_name
+ self.client_addrs = client_addrs
+ self.D_client_addr = D_client_addr
+ self.dest = destination
+ self.error = None
+ self.sender = None
+ self.receiver = None
+ self.debug = False
+
+ self.n_messages = 30
+ self.n_received = 0
+ self.n_accepted = 0
+ self.n_released = 0
+ self.n_sent = 0
+
+ self.burst_size = 3
+ self.timers = dict()
+ self.reactor = None
+ self.bailing = False
+ self.sender_connection = None
+ self.receiver_connection = None
+ self.connections = []
+
+ self.D_management_connection = None
+ self.D_management_receiver = None
+ self.D_management_sender = None
+ self.D_management_helper = None
+
+ self.confirmed_kill = False
+
+
+ def debug_print ( self, text ) :
+ if self.debug == True:
+ print "%.6lf %s" % ( time.time(), text )
+
+
+ # Shut down everything and exit.
+ def bail ( self, text ):
+ self.bailing = True
+ self.error = text
+
+ for stopwatch in self.timers.values() :
+ stopwatch.timer.cancel()
+
+ for cnx in self.connections :
+ cnx.close ( )
+
+
+ # Call this from all handlers of dispositions returning to the sender.
+ def bail_out_if_done ( self ) :
+ if self.n_accepted + self.n_released >= self.n_messages :
+ # We have received everything. But! Did we get a confirmed kill on the connector?
+ if not self.confirmed_kill :
+ # This is a failure.
+ self.bail ( "No confirmed kill on connector." )
+ else :
+ # Success!
+ self.bail ( None )
+
+
+ def timeout ( self, timer_name ):
+ # If we are in the process of punching out, ignore all other timers.
+ if self.bailing :
+ return
+
+ self.debug_print ( "timeout %s" % timer_name )
+
+ # If this is the doomsday timer, just punch out.
+ if timer_name == 'test' :
+ self.bail ( None )
+ return
+
+ # Timer-specific actions.
+ if timer_name == 'sender' :
+ self.send ( )
+
+ # Generic actions for all timers.
+ stopwatch = self.timers[timer_name]
+ stopwatch.timer = self.reactor.schedule ( stopwatch.repeat_time, Timeout(self, timer_name) )
+
+
+ def on_start ( self, event ):
+ self.reactor = event.reactor
+
+ # This stopwatch will end the test.
+ stopwatch_name='test'
+ init_time=60
+ self.timers[stopwatch_name] = \
+ Stopwatch ( name = stopwatch_name, \
+ timer = event.reactor.schedule(init_time, Timeout(self, stopwatch_name)), \
+ initial_time = init_time, \
+ repeat_time = 0 \
+ )
+
+ # This stopwatch calls the sender.
+ stopwatch_name='sender'
+ init_time=2
+ self.timers[stopwatch_name] = \
+ Stopwatch ( name = stopwatch_name, \
+ timer = event.reactor.schedule(init_time, Timeout(self, stopwatch_name)), \
+ initial_time = init_time, \
+ repeat_time = 0.5 \
+ )
+
+ self.sender_connection = event.container.connect ( self.client_addrs['A'] )
+ self.receiver_connection = event.container.connect ( self.client_addrs['B'] )
+ self.connections.append ( self.sender_connection )
+ self.connections.append ( self.receiver_connection )
+
+ self.sender = event.container.create_sender ( self.sender_connection, self.dest, name='sender' )
+ self.receiver = event.container.create_receiver ( self.receiver_connection, self.dest, name='receiver' )
+
+ # In this test, we send a single management command to the D router
+ # to kill a 'spurious', i.e. unused, connector.
+ self.D_management_connection = event.container.connect ( self.D_client_addr )
+ self.D_management_receiver = event.container.create_receiver ( self.D_management_connection, dynamic=True )
+ self.D_management_sender = event.container.create_sender ( self.D_management_connection, "$management" )
+ self.connections.append ( self.D_management_connection )
+
+
+ def on_link_opened ( self, event ) :
+ self.debug_print ( "on_link_opened" )
+ if event.receiver:
+ event.receiver.flow ( self.n_messages )
+
+ if event.receiver == self.D_management_receiver :
+ event.receiver.flow ( 100 )
+ self.D_management_helper = ManagementMessageHelper ( event.receiver.remote_source.address )
+
+
+ def on_released ( self, event ) :
+ # The test fails.
+ self.bail ( "a message was released" )
+
+
+ def run(self):
+ Container(self).run()
+
+
+ def kill_the_connector ( self ) :
+ router = 'D'
+ connector = 'AD2_connector'
+ self.debug_print ( "killing connector %s on router %s" % (connector, router) )
+ msg = self.D_management_helper.make_connector_delete_command ( connector )
+ self.debug_print ( "killing connector %s on router %s" % (connector, router) )
+ self.D_management_sender.send ( msg )
+ self.most_recent_kill = time.time()
+
+
+ def parse_link_query_response ( self, msg ) :
+ if msg.properties :
+ if "statusDescription" in msg.properties and "statusCode" in msg.properties :
+ if msg.properties["statusDescription"] == "No Content" and msg.properties["statusCode"] == 204 :
+ self.debug_print ( "AD2_connector was killed" )
+ self.confirmed_kill = True
+
+
+ #=======================================================
+ # Sender Side
+ #=======================================================
+
+ def send ( self ) :
+
+ if self.n_sent >= self.n_messages :
+ return
+
+ for _ in xrange ( self.burst_size ) :
+ if self.sender.credit > 0 and self.n_sent < self.n_messages :
+ msg = Message ( body=self.n_sent )
+ self.n_sent += 1
+ self.sender.send ( msg )
+ self.debug_print ( "sent: %d" % self.n_sent )
+ else :
+ self.debug_print ( "sender has no credit." )
+
+
+ def on_accepted ( self, event ):
+ self.n_accepted += 1
+ self.debug_print ( "on_accepted %d" % self.n_accepted )
+ self.bail_out_if_done ( )
+
+
+ def on_released ( self, event ) :
+ self.n_released += 1
+ self.debug_print ( "on_released %d" % self.n_released )
+ self.bail_out_if_done ( )
+
+
+ #=======================================================
+ # Receiver Side
+ #=======================================================
+
+ def on_message ( self, event ):
+ if event.receiver == self.D_management_receiver :
+ self.parse_link_query_response ( event.message )
+ else :
+ self.n_received += 1
+ self.debug_print ( "received message %s" % event.message.body )
+ self.debug_print ( "n_received == %d" % self.n_received )
+ if self.n_received == 13 :
+ self.kill_the_connector ( )
+
+
class TopologyDisposition ( MessagingHandler ):
"""
+ Test that disposition guarantee survives catastrophic
+ damage to the router network.
"""
-
# TopologyDisposition Notes
# ========================================
#
# 1. What is the goal of this test?
# ------------------------------------------
- # The point of this test is to make sure that, in spite of
+ # The point of this test is to make sure that, in spite of
# serious disruption to a complex router network topology,
# the sender always knows the dispositions of its messages.
- # By the end of the test, it should know that all sent
- # messages were either received or released.
+ # By the end of the test, it should know that all sent
+ # messages were either received or released.
# ( Note that some messages may be "modified", but the reactor
- # interface that this test uses issues on_released events for
- # modified messages, same as released, so I am lumping them
+ # interface that this test uses issues on_released events for
+ # modified messages, same as released, so I am lumping them
# together.
#
#
# 2. Routes and Connector Kills
# ------------------------------------------
# Messages are always sent from A, and received at B.
- # Routes are contrtolled by assigning different costs to the
+ # Routes are controlled by assigning different costs to the
# various links, and by then killing 3 connectors one at a timee,
# at different points in the test.
# First route ahould be ADCB.
@@ -385,21 +647,21 @@ class TopologyDisposition ( MessagingHandler ):
# 3. Two Timers
# ------------------------------------------
# Sending is done in batches, using a timer. The timer expires
- # once every 0.5 seconds, and we send a small batch of 10 messages
- # (or as many as the sender has credit for).
- # There is also a deadline timer that terminates the test with
+ # once every 0.5 seconds, and we send a small batch of 10 messages
+ # (or as many as the sender has credit for).
+ # There is also a deadline timer that terminates the test with
# failure if it ever fires.
- #
+ #
#
# 4. The Simple State Machine
# ------------------------------------------
# I want behavior that is a little more complex than what would
# be possible by simply reacting to the callback functions, so I
# impose on top of them a simple state machine. The states proceed
- # in a simple linear sequence, and some of the callback functions
+ # in a simple linear sequence, and some of the callback functions
# consult the current state befoe deciding what they should do.
# And bump the state machine to its next state when appropriate.
- #
+ #
# state purpose
# ----------------------------------------
#
@@ -410,13 +672,13 @@ class TopologyDisposition ( MessagingHandler ):
#
# link checking visual inspection of various link data
# during debugging.
- #
+ #
# sending send the messages, in 70 batches of 10,
# spaced 0.5 seconds apart.
- #
+ #
# done sending quite sending messages and wait for either
- # the sum of ACCEPTED + RELEASED to add up to
- # SENT, causing the test to succeed, or the
+ # the sum of ACCEPTED + RELEASED to add up to
+ # SENT, causing the test to succeed, or the
# test timer to expire, causing the test to fail.
#
# bailing enter this state when we are in the process of
@@ -428,7 +690,7 @@ class TopologyDisposition ( MessagingHandler ):
# ----------------------------------
# When the send-timer goes off, I send a burst of messages ( self.send_burst_size ).
# There is no especially great reason for this, except that I liked the idea of a
- # send timer because it seemed more realistic to me -- more like a real application --
+ # send timer because it seemed more realistic to me -- more like a real application --
# and that implies sending bursts of messages.
@@ -550,11 +812,11 @@ class TopologyDisposition ( MessagingHandler ):
self.debug_print ( "sent: %d received: %d accepted: %d released: %d confirmed kills: %d" % \
( self.n_sent, self.n_received, self.n_accepted, self.n_released, self.confirmed_kills ) )
- diff = self.n_sent - (self.n_accepted + self.n_released)
+ diff = self.n_sent - (self.n_accepted + self.n_released)
- # If the difference between n_sent and (accepted + released) is
- # ever greater than 10 (the send batch size)
- if diff >= self.send_burst_size and self.state == 'done sending' :
+ # If the difference between n_sent and (accepted + released) is
+ # ever greater than 10 (the send batch size)
+ if diff >= self.send_burst_size and self.state == 'done sending' :
self.debug_print ( "TROUBLE : %d" % diff )
if self.first_trouble == 0:
@@ -563,12 +825,12 @@ class TopologyDisposition ( MessagingHandler ):
else:
trouble_duration = time.time() - self.first_trouble
self.debug_print ( "trouble duration %.6lf" % trouble_duration )
- if trouble_duration >= self.max_trouble_duration :
+ if trouble_duration >= self.max_trouble_duration :
self.state_transition ( 'trouble duration exceeded limit: %d' % self.max_trouble_duration, 'post mortem' )
self.check_links ( )
self.send_timer = self.reactor.schedule ( self.send_interval, Timeout(self, "sender") )
-
+
def on_start ( self, event ):
@@ -576,16 +838,16 @@ class TopologyDisposition ( MessagingHandler ):
self.reactor = event.reactor
self.test_timer = event.reactor.schedule ( self.deadline, Timeout(self, "test") )
self.send_timer = event.reactor.schedule ( self.send_interval, Timeout(self, "sender") )
- self.send_conn = event.container.connect ( self.client_addrs[0] ) # A
- self.recv_conn = event.container.connect ( self.client_addrs[1] ) # B
+ self.send_conn = event.container.connect ( self.client_addrs['A'] )
+ self.recv_conn = event.container.connect ( self.client_addrs['B'] )
self.sender = event.container.create_sender ( self.send_conn, self.dest )
self.receiver = event.container.create_receiver ( self.recv_conn, self.dest )
- self.routers['A'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[0] )
- self.routers['B'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[1] )
- self.routers['C'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[2] )
- self.routers['D'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[3] )
+ self.routers['A'] ['mgmt_conn'] = event.container.connect ( self.client_addrs['A'] )
+ self.routers['B'] ['mgmt_conn'] = event.container.connect ( self.client_addrs['B'] )
+ self.routers['C'] ['mgmt_conn'] = event.container.connect ( self.client_addrs['C'] )
+ self.routers['D'] ['mgmt_conn'] = event.container.connect ( self.client_addrs['D'] )
self.routers['A'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['A'] ['mgmt_conn'], dynamic=True )
self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['B'] ['mgmt_conn'], dynamic=True )
@@ -600,7 +862,7 @@ class TopologyDisposition ( MessagingHandler ):
#-----------------------------------------------------------------
- # At start-time, as the management links to the routers open,
+ # At start-time, as the management links to the routers open,
# check each one to make sure that it has all the expected
# connections.
#-----------------------------------------------------------------
@@ -661,7 +923,7 @@ class TopologyDisposition ( MessagingHandler ):
event.receiver == self.routers['D'] ['mgmt_receiver'] :
if self.state == 'topo checking' :
- # In the 'topo checking' state, we send management messages to
+ # In the 'topo checking' state, we send management messages to
# ask the 4 routers about their connections. Then, parsing the
# replies, we make sure that we count the expected 6 connections.
# (The 4 routers are completely connected.)
@@ -681,8 +943,8 @@ class TopologyDisposition ( MessagingHandler ):
elif self.state == 'link checking' or self.state == 'post mortem' :
# Link checking was used during initial debugging of this test,
- # to visually check on the number of undelivered and unsettled
- # messages in each link, especially during the "post mortem"
+ # to visually check on the number of undelivered and unsettled
+ # messages in each link, especially during the "post mortem"
# state triggered by a failure.
if event.receiver == self.routers['A'] ['mgmt_receiver'] :
self.debug_print ( "received link check message from A ------------" )
@@ -752,7 +1014,7 @@ class TopologyDisposition ( MessagingHandler ):
mgmt_sender.send ( msg )
- # The target structure provides the name of the router and the name of its connector
+ # The target structure provides the name of the router and the name of its connector
# that is to be killed. Create the appropriate management message, and send it off.
def kill_a_connector ( self, target ) :
router = target[0]
@@ -760,7 +1022,7 @@ class TopologyDisposition ( MessagingHandler ):
mgmt_helper = self.routers[router] ['mgmt_helper']
mgmt_sender = self.routers[router] ['mgmt_sender']
msg = mgmt_helper.make_connector_delete_command ( connector )
- self.debug_print ( "!!!!!\nkilling connector %s on router %s \n!!!!!" % (connector, router) )
+ self.debug_print ( "killing connector %s on router %s" % (connector, router) )
mgmt_sender.send ( msg )
self.most_recent_kill = time.time()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org