You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2018/02/28 17:01:18 UTC
qpid-dispatch git commit: DISPATCH-209 : additive topology tests
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 1134dbae4 -> 2a5633c43
DISPATCH-209 : additive topology tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2a5633c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2a5633c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2a5633c4
Branch: refs/heads/master
Commit: 2a5633c431c1769ba45eb7d42a714e24c8f70ece
Parents: 1134dba
Author: mgoulish <mg...@redhat.com>
Authored: Wed Feb 28 11:58:30 2018 -0500
Committer: mgoulish <mg...@redhat.com>
Committed: Wed Feb 28 11:58:30 2018 -0500
----------------------------------------------------------------------
tests/CMakeLists.txt | 1 +
tests/system_tests_topology_addition.py | 466 +++++++++++++++++++++++++++
2 files changed, 467 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2a5633c4/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 1d4d1c8..ee55728 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -105,6 +105,7 @@ foreach(py_test_module
system_tests_delivery_abort
system_tests_topology
system_tests_topology_disposition
+ system_tests_topology_addition
${SYSTEM_TESTS_HTTP}
)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2a5633c4/tests/system_tests_topology_addition.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_topology_addition.py b/tests/system_tests_topology_addition.py
new file mode 100644
index 0000000..9feab29
--- /dev/null
+++ b/tests/system_tests_topology_addition.py
@@ -0,0 +1,466 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess import PIPE, STDOUT
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
+from proton.utils import BlockingConnection
+from qpid_dispatch.management.client import Node
+
+import time
+import datetime
+
+
+# PROTON-828:
+try:
+ from proton import MODIFIED
+except ImportError:
+ from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+#====================================================
+# Helper classes for all tests.
+#====================================================
+
+
+# Named timers allow test code to distinguish between several
+# simultaneous timers, going off at different rates.
+class Timeout ( object ):
+ def __init__(self, parent, name):
+ self.parent = parent
+ self.name = name
+
+ def on_timer_task(self, event):
+ self.parent.timeout ( self.name )
+
+
+
+#================================================================
+# Setup
+#================================================================
+
+class TopologyAdditionTests ( TestCase ):
+
+ @classmethod
+ def setUpClass(cls):
+ super(TopologyAdditionTests, cls).setUpClass()
+
+ def router ( name, more_config ):
+
+ config = [ ('router', {'mode': 'interior', 'id': name}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+ ] \
+ + more_config
+
+ config = Qdrouterd.Config(config)
+
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+
+ client_ports = dict()
+ client_ports['A'] = cls.tester.get_port()
+ client_ports['B'] = cls.tester.get_port()
+ client_ports['C'] = cls.tester.get_port()
+ client_ports['D'] = cls.tester.get_port()
+
+ cls.inter_router_ports = dict()
+ cls.inter_router_ports['A'] = cls.tester.get_port()
+ cls.inter_router_ports['B'] = cls.tester.get_port()
+
+ # Only routers A and B are set up initially by this class.
+ # Routers C and D are started by the test itself.
+ router_A_config = [
+ ( 'listener',
+ { 'port': client_ports['A'],
+ 'role': 'normal',
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'listener',
+ { 'role': 'inter-router',
+ 'port': cls.inter_router_ports['A']
+ }
+ )
+ ]
+
+ router_B_config = [
+ ( 'listener',
+ { 'port': client_ports['B'],
+ 'role': 'normal',
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'listener',
+ { 'role': 'inter-router',
+ 'port': cls.inter_router_ports['B'],
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'AB_connector',
+ 'role': 'inter-router',
+ 'port': cls.inter_router_ports['A'],
+ 'verifyHostName': 'no',
+ 'cost': 12,
+ 'stripAnnotations': 'no'
+ }
+ )
+ ]
+
+ router ( 'A', router_A_config )
+ router ( 'B', router_B_config )
+
+ router_A = cls.routers[0]
+ router_B = cls.routers[1]
+
+ router_A.wait_router_connected('B')
+
+ cls.A_addr = router_A.addresses[0]
+ cls.B_addr = router_B.addresses[0]
+
+
+ cls.router_C_config = [
+ ( 'listener',
+ { 'port': client_ports['C'],
+ 'role': 'normal',
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'AC_connector',
+ 'role': 'inter-router',
+ 'port': cls.inter_router_ports['A'],
+ 'verifyHostName': 'no',
+ 'cost': 5,
+ 'stripAnnotations': 'no',
+ 'linkCapacity' : 1000
+ }
+ ),
+ ( 'connector',
+ { 'name': 'BC_connector',
+ 'role': 'inter-router',
+ 'port': cls.inter_router_ports['B'],
+ 'verifyHostName': 'no',
+ 'cost': 5,
+ 'stripAnnotations': 'no',
+ 'linkCapacity' : 1000
+ }
+ )
+ ]
+
+ cls.router_D_config = [
+ ( 'listener',
+ { 'port': client_ports['D'],
+ 'role': 'normal',
+ 'stripAnnotations': 'no'
+ }
+ ),
+ ( 'connector',
+ { 'name': 'AD_connector',
+ 'role': 'inter-router',
+ 'port': cls.inter_router_ports['A'],
+ 'verifyHostName': 'no',
+ 'cost': 7,
+ 'stripAnnotations': 'no',
+ 'linkCapacity' : 1000
+ }
+ ),
+ ( 'connector',
+ { 'name': 'BD_connector',
+ 'role': 'inter-router',
+ 'port': cls.inter_router_ports['B'],
+ 'verifyHostName': 'no',
+ 'cost': 7,
+ 'stripAnnotations': 'no',
+ 'linkCapacity' : 1000
+ }
+ )
+ ]
+
+
+
+ # This method allows test code to add new routers during the test,
+ # rather than only at startup like A and B above.
+ def addRouter ( self, name, more_config ) :
+ config = [ ('router', {'mode': 'interior', 'id': name}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+ ] \
+ + more_config
+
+ config = Qdrouterd.Config(config)
+
+ TopologyAdditionTests.routers.append(TopologyAdditionTests.tester.qdrouterd(name, config, wait=True))
+
+
+ def test_01_new_route_low_cost ( self ):
+ # During the test, test code will add a new router C,
+ # connecting A and B with new low-cost links. At that
+ # point the test's messages should switch from using
+ # route AB to using route ACB.
+ # By passing both of these routes to the test, I tell
+ # it to expect both of them to be used.
+ # If it terminates with the second path remaining unused
+ # it will fail.
+ initial_expected_trace = [ '0/A', '0/B' ]
+ final_expected_trace = [ '0/A', '0/C', '0/B' ]
+
+ test = AddRouter ( self.A_addr,
+ self.B_addr,
+ "closest/01",
+ self,
+ 'C',
+ self.router_C_config,
+ [ initial_expected_trace, final_expected_trace ]
+ )
+ test.run()
+ self.assertEqual ( None, test.error )
+
+
+
+ def test_02_new_route_high_cost ( self ):
+ # During the test, test code will add a new router D,
+ # connecting A and B with new links. But the links are
+ # higher cost than what already exist. The network should
+ # ignore them and keep using the lowest cost route that it
+ # already has.
+ # We communicate this expectation to the test by sending
+ # it a single expected trace. The test will fail with
+ # error if any other traces are encountered.
+ only_expected_trace = [ '0/A', '0/C', '0/B' ]
+
+ test = AddRouter ( self.A_addr,
+ self.B_addr,
+ "closest/02",
+ self,
+ 'D',
+ self.router_D_config,
+ [ only_expected_trace ]
+ )
+ test.run()
+ self.assertEqual ( None, test.error )
+
+
+
+
+#================================================================
+# Tests
+#================================================================
+
+
+#--------------------------------------------------------------
+#
+# First test
+# ------------------
+#
+# Send some messages through the original A---B router network,
+# Then change it to look like this:
+#
+# C
+# / \
+# / \
+# / \
+# / \
+# A -------- B
+#
+# But the caller controls what costs are assigned to the two
+# new links, so only the caller knows whether messages should
+# start to flow through the new route ACB or not. It passes
+# that knowledge in to us as a list of expected paths.
+# This test's job is to make sure that all the expected paths
+# get used by messages, and no others get used.
+#
+#
+# Second test
+# ------------------
+#
+# The triangular network from the first test still exists, and
+# we will add to it a new router D which also connects A and B.
+#
+# C
+# / \
+# / \
+# / \
+# / \
+# A -------- B
+# \ /
+# \ /
+# \ /
+# \ /
+# D
+# As in the first test, the caller tells us what routes ought
+# to be followed, by putting them in the 'expected_traces' arg.
+#
+#--------------------------------------------------------------
+
+class AddRouter ( MessagingHandler ):
+ def __init__ ( self,
+ send_addr,
+ recv_addr,
+ destination,
+ parent,
+ new_router_name,
+ new_router_config,
+ expected_traces
+ ):
+ super(AddRouter, self).__init__(prefetch=100)
+ self.send_addr = send_addr
+ self.recv_addr = recv_addr
+ self.dest = destination
+ self.parent = parent
+ self.new_router_name = new_router_name
+ self.new_router_config = new_router_config
+
+ self.error = None
+ self.sender = None
+ self.receiver = None
+ self.n_expected = 30
+ self.n_sent = 0
+ self.n_received = 0
+ self.test_timer = None
+ self.send_timer = None
+ self.timeout_count = 0
+ self.reactor = None
+ self.container = None
+ self.finishing = False
+
+ # The parent sends us a list of the traces we
+ # ought to see on messages.
+ # Make a little data structure that
+ # will keep track of how many times each trace was seen.
+ self.expected_trace_counts = list()
+ for i in xrange ( len(expected_traces )) :
+ self.expected_trace_counts.append ( [ expected_traces[i], 0 ] )
+
+
+ def run ( self ) :
+ Container(self).run()
+
+
+ # Close everything and allow the test to terminate.
+ def bail ( self, reason_for_bailing ) :
+ self.error = reason_for_bailing
+ self.receiver.close()
+ self.send_conn.close()
+ self.recv_conn.close()
+ self.test_timer.cancel()
+ self.send_timer.cancel()
+
+
+ # There are two timers. The 'test' timer should only expire if
+ # something has gone wrong, in which case it terminates the test.
+ # The 'send' timer expires frequently, and every time it goes off
+ # we send out a little batch of messages.
+ def timeout ( self, name ):
+ self.timeout_count += 1
+ if name == "test" :
+ self.bail ( "Timeout Expired" )
+ elif name == "send" :
+ self.send()
+ self.send_timer = self.reactor.schedule(1, Timeout(self, "send"))
+
+ # At T+5, create the new router with link costs as
+ # specified by parent. We do it partway into the test
+ # so that some messages will flow through the original
+ # network, and some will flow through the network with
+ # the new router added.
+ if self.timeout_count == 5 :
+ self.parent.addRouter ( self.new_router_name, self.new_router_config )
+
+
+ def on_start(self, event):
+ self.reactor = event.reactor
+ self.container = event.container
+
+ self.test_timer = self.reactor.schedule(30, Timeout(self, "test"))
+ self.send_timer = self.reactor.schedule(1, Timeout(self, "send"))
+ self.send_conn = event.container.connect(self.send_addr)
+ self.recv_conn = event.container.connect(self.recv_addr)
+ self.sender = event.container.create_sender(self.send_conn, self.dest)
+ self.receiver = event.container.create_receiver(self.recv_conn, self.dest)
+ self.receiver.flow ( self.n_expected )
+
+
+ #------------------------------------------------------------
+ # Sender Side
+ #------------------------------------------------------------
+
+ def send(self):
+ # Send little bursts of 3 messages every sender-timeout.
+ for _ in xrange(3) :
+ msg = Message(body=self.n_sent)
+ self.sender.send(msg)
+ self.n_sent += 1
+
+
+ #------------------------------------------------------------
+ # Receiver Side
+ #------------------------------------------------------------
+
+ def on_message(self, event):
+ if self.finishing :
+ return
+ self.n_received += 1
+ trace = event.message.annotations [ 'x-opt-qd.trace' ]
+ # Deliberate flaw for debugging.
+ # if self.n_received == 13 :
+ # trace = [ '0/B', '0/A', '0/D' ]
+ self.record_trace ( trace )
+ if self.n_received == self.n_expected:
+ self.finishing = True
+ self.finish_test ( )
+
+
+ def record_trace ( self, observed_trace ):
+ for trace_record in self.expected_trace_counts :
+ trace = trace_record [ 0 ]
+ if observed_trace == trace :
+ trace_record [ 1 ] += 1
+ return
+ # If we get here, the trace is one we were not expecting. That's bad.
+ self.bail ( "Unexpected trace: %s" % observed_trace )
+
+
+ # Shut down everything and make sure that all of the extected traces
+ # have been seen.
+ def finish_test ( self ) :
+ self.test_timer.cancel()
+ self.send_timer.cancel()
+ for trace_record in self.expected_trace_counts :
+ count = trace_record[1]
+ # Deliberate flaw for debugging.
+ # count = 0
+ if count <= 0 :
+ self.bail ( "Trace %s was not seen." % trace_record[0] )
+ return
+
+ # success
+ self.bail ( None )
+
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org