You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2017/11/29 17:34:45 UTC
qpid-dispatch git commit: DISPATCH-209 : first topology test
Repository: qpid-dispatch
Updated Branches:
refs/heads/master e494da0a9 -> 58ecc97bc
DISPATCH-209 : first topology test
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/58ecc97b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/58ecc97b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/58ecc97b
Branch: refs/heads/master
Commit: 58ecc97bcc3d9f89f62d308d3c9b01027151f5e3
Parents: e494da0
Author: mgoulish <mg...@redhat.com>
Authored: Wed Nov 29 11:49:59 2017 -0500
Committer: mgoulish <mg...@redhat.com>
Committed: Wed Nov 29 11:49:59 2017 -0500
----------------------------------------------------------------------
tests/CMakeLists.txt | 1 +
tests/system_tests_topology.py | 609 ++++++++++++++++++++++++++++++++++++
2 files changed, 610 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58ecc97b/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index aaf65ec..0c6454c 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -100,6 +100,7 @@ foreach(py_test_module
system_tests_denied_unsettled_multicast
system_tests_auth_service_plugin
system_tests_delivery_abort
+ system_tests_topology
${SYSTEM_TESTS_HTTP}
)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58ecc97b/tests/system_tests_topology.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_topology.py b/tests/system_tests_topology.py
new file mode 100644
index 0000000..5e58483
--- /dev/null
+++ b/tests/system_tests_topology.py
@@ -0,0 +1,609 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess import PIPE, STDOUT
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
+from proton.utils import BlockingConnection
+from qpid_dispatch.management.client import Node
+
+import time
+import datetime
+import pdb
+
+
+
+# PROTON-828:
+try:
+ from proton import MODIFIED
+except ImportError:
+ from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+
+
+#------------------------------------------------
+# Helper classes for all tests.
+#------------------------------------------------
+
+class Timeout(object):
+ """
+ Named timeout object can handle multiple simultaneous
+ timers, by telling the parent which one fired.
+ """
+ def __init__ ( self, parent, name ):
+ self.parent = parent
+ self.name = name
+
+ def on_timer_task ( self, event ):
+ self.parent.timeout ( self.name )
+
+
+
+class ManagementMessageHelper ( object ):
+ """
+ Format management messages.
+ """
+ def __init__ ( self, reply_addr ):
+ self.reply_addr = reply_addr
+
+ def make_connector_query ( self, connector_name ):
+ props = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name }
+ msg = Message ( properties=props, reply_to=self.reply_addr )
+ return msg
+
+ def make_connector_delete_command ( self, connector_name ):
+ props = {'operation': 'DELETE', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name }
+ msg = Message ( properties=props, reply_to=self.reply_addr )
+ return msg
+
+
+#------------------------------------------------
+# END Helper classes for all tests.
+#------------------------------------------------
+
+
+
+
+
+#================================================================
+# Setup
+#================================================================
+
+class TopologyTests ( TestCase ):
+
+ @classmethod
+ def setUpClass(cls):
+ super(TopologyTests, cls).setUpClass()
+
+
+
+ def router(name, more_config):
+
+ config = [ ('router', {'mode': 'interior', 'id': name}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+ ] \
+ + more_config
+
+ config = Qdrouterd.Config(config)
+
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+
+ A_client_port = cls.tester.get_port()
+ B_client_port = cls.tester.get_port()
+ C_client_port = cls.tester.get_port()
+ D_client_port = cls.tester.get_port()
+
+ A_inter_router_port = cls.tester.get_port()
+ B_inter_router_port = cls.tester.get_port()
+ C_inter_router_port = cls.tester.get_port()
+
+ #
+ #
+ # Topology of the 4-mesh, with costs of connections marked.
+ # Tail of arrow indicates initiator of connection.
+ #
+ # 1
+ # D ----------> A
+ # | \ > ^
+ # | 20\ 50/ |
+ # | \ / |
+ # 1 | / \ | 100
+ # | / \ |
+ # v / > |
+ # C ----------> B
+ # 1
+ #
+ # Test 1 TopologyFailover Notes
+ #
+ # 1. Messages are always sent from A, and go to B.
+ # 2. First route ahould be ADCB.
+ # 3. Then we kill connector CD.
+ # 4. Next route should be ADB.
+ # 5. Then we kill connector BD.
+ # 6. Next route should be ACB.
+ # 7. Then we kill connector BC.
+ # 8. Final route should be AB.
+
+ cls.A_B_cost = 100
+ cls.A_C_cost = 50
+ cls.A_D_cost = 1
+ cls.B_C_cost = 1
+ cls.B_D_cost = 20
+ cls.C_D_cost = 1
+
+ router ( 'A',
+ [
+ ( 'listener',
+ { 'port': A_client_port,
+ 'role': 'normal',
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'listener',
+ { 'role': 'inter-router',
+ 'port': A_inter_router_port,
+ 'stripAnnotations': 'no'
+ }
+ )
+ ]
+ )
+
+
+ router ( 'B',
+ [
+ ( 'listener',
+ { 'port': B_client_port,
+ 'role': 'normal',
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'listener',
+ { 'role': 'inter-router',
+ 'port': B_inter_router_port,
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'AB_connector',
+ 'role': 'inter-router',
+ 'port': A_inter_router_port,
+ 'verifyHostName': 'no',
+ 'cost': cls.A_B_cost,
+ 'stripAnnotations': 'no'
+ }
+ )
+ ]
+ )
+
+
+ router ( 'C',
+ [
+ ( 'listener',
+ { 'port': C_client_port,
+ 'role': 'normal',
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'listener',
+ { 'role': 'inter-router',
+ 'port': C_inter_router_port,
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'AC_connector',
+ 'role': 'inter-router',
+ 'port': A_inter_router_port,
+ 'verifyHostName': 'no',
+ 'cost' : cls.A_C_cost,
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'BC_connector',
+ 'role': 'inter-router',
+ 'port': B_inter_router_port,
+ 'verifyHostName': 'no',
+ 'cost' : cls.B_C_cost,
+ 'stripAnnotations': 'no'
+ }
+ )
+ ]
+ )
+
+
+ router ( 'D',
+ [
+ ( 'listener',
+ { 'port': D_client_port,
+ 'role': 'normal',
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'AD_connector',
+ 'role': 'inter-router',
+ 'port': A_inter_router_port,
+ 'verifyHostName': 'no',
+ 'cost' : cls.A_D_cost,
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'BD_connector',
+ 'role': 'inter-router',
+ 'port': B_inter_router_port,
+ 'verifyHostName': 'no',
+ 'cost' : cls.B_D_cost,
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'CD_connector',
+ 'role': 'inter-router',
+ 'port': C_inter_router_port,
+ 'verifyHostName': 'no',
+ 'cost' : cls.C_D_cost,
+ 'stripAnnotations': 'no'
+ }
+ )
+ ]
+ )
+
+
+ router_A = cls.routers[0]
+ router_B = cls.routers[1]
+ router_C = cls.routers[2]
+ router_D = cls.routers[3]
+
+ router_A.wait_router_connected('B')
+ router_A.wait_router_connected('C')
+ router_A.wait_router_connected('D')
+
+ cls.client_addrs = ( router_A.addresses[0],
+ router_B.addresses[0],
+ router_C.addresses[0],
+ router_D.addresses[0]
+ )
+
+ # 1 means skip that test.
+ cls.skip = { 'test_01' : 0
+ }
+
+
+
+ def test_01_topology_failover ( self ):
+ name = 'test_01'
+ if self.skip [ name ] :
+ self.skipTest ( "Test skipped during development." )
+ test = TopologyFailover ( name,
+ self.client_addrs,
+ "closest/01"
+ )
+ test.run()
+ self.assertEqual ( None, test.error )
+
+
+
+#================================================================
+# Tests
+#================================================================
+
+
+# Also see 'Test 1 TopologyFailover Notes', above.
+
+class TopologyFailover ( MessagingHandler ):
+ """
+ Test that the lowest-cost route is always chosen in a 4-mesh
+ network topology, as one link after another is lost.
+
+ This test also ensures that connections that have been
+ deliberately severed do no get restored.
+ """
+ def __init__ ( self, test_name, client_addrs, destination ):
+ super(TopologyFailover, self).__init__(prefetch=0)
+ self.client_addrs = client_addrs
+ self.dest = destination
+ self.error = None
+ self.sender = None
+ self.receiver = None
+ self.test_timer = None
+ self.send_timer = None
+ self.n_sent = 0
+ self.n_received = 0
+ self.n_accepted = 0
+ self.n_released = 0
+ self.reactor = None
+ self.state = None
+ self.send_conn = None
+ self.recv_conn = None
+ self.nap_time = 2
+ self.debug = False
+
+ # Holds the management sender, receiver, and 'helper'
+ # associated with each router.
+ self.routers = {
+ 'A' : dict(),
+ 'B' : dict(),
+ 'C' : dict(),
+ 'D' : dict()
+ }
+
+ # These are the expectes routing traces, in the order we
+ # expect to receive them.
+ self.expected_traces = [
+ [u'0/A', u'0/D', u'0/C', u'0/B'],
+ [u'0/A', u'0/D', u'0/B'],
+ [u'0/A', u'0/C', u'0/B'],
+ [u'0/A', u'0/B']
+ ]
+ self.trace_count = 0
+
+ # This tells the system in what order to kill the connectors.
+ self.kill_list = (
+ ( 'D', 'CD_connector' ),
+ ( 'D', 'BD_connector' ),
+ ( 'C', 'BC_connector' )
+ )
+
+ # Use this to keep track of which connectors we have found
+ # when the test is first getting started and we are checking
+ # the topology.
+ self.connectors_map = { 'AB_connector' : 0,
+ 'AC_connector' : 0,
+ 'AD_connector' : 0,
+ 'BC_connector' : 0,
+ 'BD_connector' : 0,
+ 'CD_connector' : 0
+ }
+
+
+ # The simple state machine transitions when certain events happen,
+ # if certain conditions are met. The conditions are checked for
+ # by the callbacks for the events.
+ # The normal sequence of states in the state machine is:
+ # 1. starting -- doesn't do anything
+ # 2. checking -- checks initial topology
+ # 3. examine_trace -- look at routing trace of first message
+ # 4. kill_connector -- kills the first connector (CD)
+ # 5. examine_trace -- checks routing trace of next message
+ # 5. kill_connector -- kills the next connector (BD)
+ # 5. examine_trace -- checks routing trace of next message
+ # 5. kill_connector -- kills the next connector (BC)
+ # 5. examine_trace -- checks routing trace of final message
+ # 5. bailing -- bails out with success
+
+
+ def state_transition ( self, message, new_state ) :
+ if self.state == new_state :
+ return
+ self.state = new_state
+ self.debug_print ( "state transition to : %s -- because %s" % ( self.state, message ) )
+
+
+ def debug_print ( self, text ) :
+ if self.debug == True:
+ print time.time(), text
+
+
+ # Shut down everything and exit.
+ def bail ( self, text ):
+ self.error = text
+
+ self.send_conn.close ( )
+ self.recv_conn.close ( )
+
+ self.routers['B'] ['mgmt_conn'].close()
+ self.routers['C'] ['mgmt_conn'].close()
+ self.routers['D'] ['mgmt_conn'].close()
+
+ self.test_timer.cancel ( )
+ self.send_timer.cancel ( )
+
+
+ #------------------------------------------------------------------------
+ # I want some behavior from this test that is a little too complex
+ # to be governed by the usual callback functions. The way I do this
+ # is by making a simple state machine that checks some conditions
+ # during some callback, and then either steps forward or terminates
+ # the test.
+ # The callbacks that activate the state machine are mostly on_message,
+ # or timeout. But there are two different timers: the one-second
+ # timer that mostly runs the test, and the 60-second timer that, if it
+ # fires, will terminate the test with a timeout error.
+ #------------------------------------------------------------------------
+ def timeout ( self, name ):
+ if name == 'test':
+ self.set_state ( 'Timeout Expired', 'bailing' )
+ self.bail ( "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \
+ ( self.n_sent, self.n_received, self.n_accepted ) )
+ elif name == 'sender':
+ if self.state == 'examine_trace' :
+ self.send ( )
+ self.send_timer = self.reactor.schedule(1, Timeout(self, "sender"))
+
+
+ def on_start ( self, event ):
+ self.state_transition ( 'on_start', 'starting' )
+ self.reactor = event.reactor
+ self.test_timer = event.reactor.schedule ( 60, Timeout(self, "test") )
+ self.send_timer = event.reactor.schedule ( 1, Timeout(self, "sender") )
+ self.send_conn = event.container.connect ( self.client_addrs[0] ) # A
+ self.recv_conn = event.container.connect ( self.client_addrs[1] ) # B
+
+ self.sender = event.container.create_sender ( self.send_conn, self.dest )
+ self.receiver = event.container.create_receiver ( self.recv_conn, self.dest )
+ self.receiver.flow ( 100 )
+
+ # I will only send management messages to B, C, and D, because
+ # they are the owners of the connections that I will want to delete.
+ self.routers['B'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[1] )
+ self.routers['C'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[2] )
+ self.routers['D'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[3] )
+
+ self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['B'] ['mgmt_conn'], dynamic=True )
+ self.routers['C'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['C'] ['mgmt_conn'], dynamic=True )
+ self.routers['D'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['D'] ['mgmt_conn'], dynamic=True )
+
+ self.routers['B'] ['mgmt_sender'] = event.container.create_sender ( self.routers['B'] ['mgmt_conn'], "$management" )
+ self.routers['C'] ['mgmt_sender'] = event.container.create_sender ( self.routers['C'] ['mgmt_conn'], "$management" )
+ self.routers['D'] ['mgmt_sender'] = event.container.create_sender ( self.routers['D'] ['mgmt_conn'], "$management" )
+
+
+
+ #-----------------------------------------------------------------
+ # At start-time, as the links to the three managed routers
+ # open, check each one to make sure that it has all the expected
+ # connections.
+ #-----------------------------------------------------------------
+ def on_link_opened ( self, event ) :
+ self.state_transition ( 'on_link_opened', 'checking' )
+ # The B mgmt link has opened. Check its connections. --------------------------
+ if event.receiver == self.routers['B'] ['mgmt_receiver'] :
+ event.receiver.flow ( 1000 )
+ self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
+ for connector in [ 'AB_connector' ] :
+ self.connector_check ( 'B', connector )
+ # The C mgmt link has opened. Check its connections. --------------------------
+ elif event.receiver == self.routers['C'] ['mgmt_receiver'] :
+ event.receiver.flow ( 1000 )
+ self.routers['C'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
+ for connector in [ 'AC_connector', 'BC_connector' ] :
+ self.connector_check ( 'C', connector )
+ # The D mgmt link has opened. Check its connections. --------------------------
+ elif event.receiver == self.routers['D'] ['mgmt_receiver']:
+ event.receiver.flow ( 1000 )
+ self.routers['D'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
+ for connector in [ 'AD_connector', 'BD_connector', 'CD_connector' ] :
+ self.connector_check ( 'D', connector )
+
+
+ def send ( self ):
+ n_sent_this_time = 0
+ if self.sender.credit <= 0:
+ self.receiver.flow ( 100 )
+ return
+ # Send messages one at a time.
+ if self.sender.credit > 0 :
+ msg = Message ( body=self.n_sent )
+ self.sender.send ( msg )
+ n_sent_this_time += 1
+ self.n_sent += 1
+ self.debug_print ( "sent: %d" % self.n_sent )
+
+
+ def on_message ( self, event ):
+
+ if event.receiver == self.routers['B'] ['mgmt_receiver'] or \
+ event.receiver == self.routers['C'] ['mgmt_receiver'] or \
+ event.receiver == self.routers['D'] ['mgmt_receiver'] :
+
+ #----------------------------------------------------------------
+ # This is a management message.
+ #----------------------------------------------------------------
+ if self.state == 'checking' :
+ connection_name = event.message.body['name']
+
+ if connection_name in self.connectors_map :
+ self.connectors_map [ connection_name ] = 1
+ else :
+ self.state_transition ( "bad connection name: %s" % connection_name, 'bailing' )
+ self.bail ( "bad connection name: %s" % connection_name )
+
+ n_connections = sum(self.connectors_map.values())
+ if n_connections == 6 :
+ self.state_transition ( "all %d connections found" % n_connections, 'examine_trace' )
+ elif self.state == 'kill_connector' :
+ if event.message.properties["statusDescription"] == 'No Content':
+ # We are in the process of killing a connector, and
+ # have received the response to the kill message.
+ self.state_transition ( 'got kill response', 'examine_trace' )
+ # This sleep is here because one early bug that this test found
+ # (and which is now fixed) involved connections that had been
+ # deleted coming back sometimes. It was a race and only happened
+ # very occasionally -- but with a pause here, after getting
+ # confirmation that we have successfully deleted the connector,
+ # the bug would show up 60 to 75% of the time. I think that leaving
+ # this sleep here is the only way to ensure that that particular
+ # bug stays fixed.
+ time.sleep ( self.nap_time )
+ else:
+ #----------------------------------------------------------------
+ # This is a payload message.
+ #----------------------------------------------------------------
+ self.n_received += 1
+ if self.state == 'examine_trace' :
+ trace = event.message.annotations [ 'x-opt-qd.trace' ]
+ expected = self.expected_traces [ self.trace_count ]
+ if trace == expected :
+ if self.trace_count == len(self.expected_traces) - 1 :
+ self.state_transition ( 'final expected trace %s observed' % expected, 'bailing' )
+ self.bail ( None )
+ return
+ self.state_transition ( "expected trace %d observed successfully %s" % ( self.trace_count, expected ) , 'kill_connector' )
+ self.kill_a_connector ( self.kill_list[self.trace_count] )
+ self.trace_count += 1
+ else :
+ self.state_transition ( "expected trace %s but got %s" % ( expected, trace ), 'bailing' )
+ self.bail ( "expected trace %s but got %s" % ( expected, trace ) )
+
+
+ def on_accepted ( self, event ):
+ self.n_accepted += 1
+
+
+ def on_released ( self, event ) :
+ self.n_released += 1
+
+
+ def connector_check ( self, router, connector ) :
+ self.debug_print ( "checking connector for router %s" % router )
+ mgmt_helper = self.routers[router] ['mgmt_helper']
+ mgmt_sender = self.routers[router] ['mgmt_sender']
+ msg = mgmt_helper.make_connector_query ( connector )
+ mgmt_sender.send ( msg )
+
+
+ def kill_a_connector ( self, target ) :
+ router = target[0]
+ connector = target[1]
+ self.debug_print ( "killing connector %s on router %s" % (connector, router) )
+ mgmt_helper = self.routers[router] ['mgmt_helper']
+ mgmt_sender = self.routers[router] ['mgmt_sender']
+ msg = mgmt_helper.make_connector_delete_command ( connector )
+ mgmt_sender.send ( msg )
+
+
+
+ def run(self):
+ Container(self).run()
+
+
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org