You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2018/02/28 17:01:18 UTC

qpid-dispatch git commit: DISPATCH-209 : additive topology tests

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 1134dbae4 -> 2a5633c43


DISPATCH-209 : additive topology tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2a5633c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2a5633c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2a5633c4

Branch: refs/heads/master
Commit: 2a5633c431c1769ba45eb7d42a714e24c8f70ece
Parents: 1134dba
Author: mgoulish <mg...@redhat.com>
Authored: Wed Feb 28 11:58:30 2018 -0500
Committer: mgoulish <mg...@redhat.com>
Committed: Wed Feb 28 11:58:30 2018 -0500

----------------------------------------------------------------------
 tests/CMakeLists.txt                    |   1 +
 tests/system_tests_topology_addition.py | 466 +++++++++++++++++++++++++++
 2 files changed, 467 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2a5633c4/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 1d4d1c8..ee55728 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -105,6 +105,7 @@ foreach(py_test_module
     system_tests_delivery_abort
     system_tests_topology
     system_tests_topology_disposition
+    system_tests_topology_addition
     ${SYSTEM_TESTS_HTTP}
     )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2a5633c4/tests/system_tests_topology_addition.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_topology_addition.py b/tests/system_tests_topology_addition.py
new file mode 100644
index 0000000..9feab29
--- /dev/null
+++ b/tests/system_tests_topology_addition.py
@@ -0,0 +1,466 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess      import PIPE, STDOUT
+from proton          import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test     import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor  import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
+from proton.utils    import BlockingConnection
+from qpid_dispatch.management.client import Node
+
+import time
+import datetime
+
+
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+#====================================================
+# Helper classes for all tests.
+#====================================================
+
+
+# Named timers allow test code to distinguish between several
+# simultaneous timers, going off at different rates.
+class Timeout ( object ):
+    def __init__(self, parent, name):
+        self.parent = parent
+        self.name   = name
+
+    def on_timer_task(self, event):
+        self.parent.timeout ( self.name )
+
+
+
+#================================================================
+#     Setup
+#================================================================
+
+class TopologyAdditionTests ( TestCase ):
+
+    @classmethod
+    def setUpClass(cls):
+        super(TopologyAdditionTests, cls).setUpClass()
+
+        def router ( name, more_config ):
+
+            config = [ ('router',  {'mode': 'interior', 'id': name}),
+                       ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+                       ('address', {'prefix': 'balanced',  'distribution': 'balanced'}),
+                       ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+                     ]      \
+                     + more_config
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        client_ports = dict()
+        client_ports['A'] = cls.tester.get_port()
+        client_ports['B'] = cls.tester.get_port()
+        client_ports['C'] = cls.tester.get_port()
+        client_ports['D'] = cls.tester.get_port()
+
+        cls.inter_router_ports = dict()
+        cls.inter_router_ports['A'] = cls.tester.get_port()
+        cls.inter_router_ports['B'] = cls.tester.get_port()
+
+        # Only routers A and B are set up initially by this class.
+        # Routers C and D are started by the test itself.
+        router_A_config = [
+                            ( 'listener',
+                              { 'port': client_ports['A'],
+                                'role': 'normal',
+                                'stripAnnotations': 'no'
+                              }
+                            ),
+                            ( 'listener',
+                              {  'role': 'inter-router',
+                                 'port': cls.inter_router_ports['A']
+                              }
+                            )
+                         ]
+
+        router_B_config = [
+                            ( 'listener',
+                              { 'port': client_ports['B'],
+                                'role': 'normal',
+                                'stripAnnotations': 'no'
+                              }
+                            ),
+                            ( 'listener',
+                              { 'role': 'inter-router',
+                                'port': cls.inter_router_ports['B'],
+                                'stripAnnotations': 'no'
+                              }
+                            ),
+                            ( 'connector',
+                              {  'name': 'AB_connector',
+                                 'role': 'inter-router',
+                                 'port': cls.inter_router_ports['A'],
+                                 'verifyHostName': 'no',
+                                 'cost':  12,
+                                 'stripAnnotations': 'no'
+                              }
+                            )
+                         ]
+
+        router ( 'A', router_A_config )
+        router ( 'B', router_B_config )
+
+        router_A = cls.routers[0]
+        router_B = cls.routers[1]
+
+        router_A.wait_router_connected('B')
+
+        cls.A_addr = router_A.addresses[0]
+        cls.B_addr = router_B.addresses[0]
+
+
+        cls.router_C_config = [
+                                ( 'listener',
+                                  { 'port': client_ports['C'],
+                                    'role': 'normal',
+                                    'stripAnnotations': 'no'
+                                  }
+                                ),
+                                ( 'connector',
+                                  {  'name': 'AC_connector',
+                                     'role': 'inter-router',
+                                     'port': cls.inter_router_ports['A'],
+                                     'verifyHostName': 'no',
+                                     'cost':  5,
+                                     'stripAnnotations': 'no',
+                                     'linkCapacity' : 1000
+                                  }
+                                ),
+                                ( 'connector',
+                                  {  'name': 'BC_connector',
+                                     'role': 'inter-router',
+                                     'port': cls.inter_router_ports['B'],
+                                     'verifyHostName': 'no',
+                                     'cost':  5,
+                                     'stripAnnotations': 'no',
+                                     'linkCapacity' : 1000
+                                  }
+                                )
+                              ]
+
+        cls.router_D_config = [
+                                ( 'listener',
+                                  { 'port': client_ports['D'],
+                                    'role': 'normal',
+                                    'stripAnnotations': 'no'
+                                  }
+                                ),
+                                ( 'connector',
+                                  {  'name': 'AD_connector',
+                                     'role': 'inter-router',
+                                     'port': cls.inter_router_ports['A'],
+                                     'verifyHostName': 'no',
+                                     'cost':  7,
+                                     'stripAnnotations': 'no',
+                                     'linkCapacity' : 1000
+                                  }
+                                ),
+                                ( 'connector',
+                                  {  'name': 'BD_connector',
+                                     'role': 'inter-router',
+                                     'port': cls.inter_router_ports['B'],
+                                     'verifyHostName': 'no',
+                                     'cost':  7,
+                                     'stripAnnotations': 'no',
+                                     'linkCapacity' : 1000
+                                  }
+                                )
+                              ]
+
+
+
+    # This method allows test code to add new routers during the test,
+    # rather than only at startup like A and B above.
+    def addRouter ( self, name, more_config ) :
+        config = [ ('router',  {'mode': 'interior', 'id': name}),
+                   ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+                   ('address', {'prefix': 'balanced',  'distribution': 'balanced'}),
+                   ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+                 ]    \
+                 + more_config
+
+        config = Qdrouterd.Config(config)
+
+        TopologyAdditionTests.routers.append(TopologyAdditionTests.tester.qdrouterd(name, config, wait=True))
+
+
+    def test_01_new_route_low_cost ( self ):
+        # During the test, test code will add a new router C,
+        # connecting A and B with new low-cost links. At that
+        # point the test's messages should switch from using
+        # route AB to using route ACB.
+        # By passing both of these routes to the test, I tell
+        # it to expect both of them to be used.
+        # If it terminates with the second path remaining unused
+        # it will fail.
+        initial_expected_trace = [ '0/A', '0/B' ]
+        final_expected_trace   = [ '0/A', '0/C', '0/B' ]
+
+        test = AddRouter ( self.A_addr,
+                           self.B_addr,
+                           "closest/01",
+                           self,
+                           'C',
+                           self.router_C_config,
+                           [ initial_expected_trace, final_expected_trace ]
+                         )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+
+
+    def test_02_new_route_high_cost ( self ):
+        # During the test, test code will add a new router D,
+        # connecting A and B with new links. But the links are
+        # higher cost than what already exist. The network should
+        # ignore them and keep using the lowest cost route that it
+        # already has.
+        # We communicate this expectation to the test by sending
+        # it a single expected trace. The test will fail with
+        # error if any other traces are encountered.
+        only_expected_trace   = [ '0/A', '0/C', '0/B' ]
+
+        test = AddRouter ( self.A_addr,
+                           self.B_addr,
+                           "closest/02",
+                           self,
+                           'D',
+                           self.router_D_config,
+                           [ only_expected_trace ]
+                         )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+
+
+
+#================================================================
+#     Tests
+#================================================================
+
+
+#--------------------------------------------------------------
+#
+# First test
+# ------------------
+#
+# Send some messages through the original A---B router network,
+# Then change it to look like this:
+#
+#                    C
+#                   / \
+#                  /   \
+#                 /     \
+#                /       \
+#               A -------- B
+#
+# But the caller controls what costs are assigned to the two
+# new links, so only the caller knows whether messages should
+# start to flow through the new route ACB or not. It passes
+# that knowledge in to us as a list of expected paths.
+# This test's job is to make sure that all the expected paths
+# get used by messages, and no others get used.
+#
+#
+# Second test
+# ------------------
+#
+# The triangular network from the first test still exists, and
+# we will add to it a new router D which also connects A and B.
+#
+#                    C
+#                   / \
+#                  /   \
+#                 /     \
+#                /       \
+#               A -------- B
+#                \       /
+#                 \     /
+#                  \   /
+#                   \ /
+#                    D
+# As in the first test, the caller tells us what routes ought
+# to be followed, by putting them in the 'expected_traces' arg.
+#
+#--------------------------------------------------------------
+
+class AddRouter ( MessagingHandler ):
+    def __init__ ( self,
+                   send_addr,
+                   recv_addr,
+                   destination,
+                   parent,
+                   new_router_name,
+                   new_router_config,
+                   expected_traces
+                 ):
+        super(AddRouter, self).__init__(prefetch=100)
+        self.send_addr         = send_addr
+        self.recv_addr         = recv_addr
+        self.dest              = destination
+        self.parent            = parent
+        self.new_router_name   = new_router_name
+        self.new_router_config = new_router_config
+
+        self.error         = None
+        self.sender        = None
+        self.receiver      = None
+        self.n_expected    = 30
+        self.n_sent        = 0
+        self.n_received    = 0
+        self.test_timer    = None
+        self.send_timer    = None
+        self.timeout_count = 0
+        self.reactor       = None
+        self.container     = None
+        self.finishing     = False
+
+        # The parent sends us a list of the traces we
+        # ought to see on messages.
+        # Make a little data structure that
+        # will keep track of how many times each trace was seen.
+        self.expected_trace_counts = list()
+        for i in xrange ( len(expected_traces )) :
+            self.expected_trace_counts.append ( [ expected_traces[i], 0 ] )
+
+
+    def run ( self ) :
+        Container(self).run()
+
+
+    # Close everything and allow the test to terminate.
+    def bail ( self, reason_for_bailing ) :
+        self.error = reason_for_bailing
+        self.receiver.close()
+        self.send_conn.close()
+        self.recv_conn.close()
+        self.test_timer.cancel()
+        self.send_timer.cancel()
+
+
+    # There are two timers. The 'test' timer should only expire if
+    # something has gone wrong, in which case it terminates the test.
+    # The 'send' timer expires frequently, and every time it goes off
+    # we send out a little batch of messages.
+    def timeout ( self, name ):
+        self.timeout_count += 1
+        if name == "test" :
+            self.bail ( "Timeout Expired" )
+        elif name == "send" :
+            self.send()
+            self.send_timer = self.reactor.schedule(1, Timeout(self, "send"))
+
+            # At T+5, create the new router with link costs as
+            # specified by parent. We do it partway into the test
+            # so that some messages will flow through the original
+            # network, and some will flow through the network with
+            # the new router added.
+            if self.timeout_count == 5 :
+                self.parent.addRouter ( self.new_router_name, self.new_router_config )
+
+
+    def on_start(self, event):
+        self.reactor   = event.reactor
+        self.container = event.container
+
+        self.test_timer  = self.reactor.schedule(30, Timeout(self, "test"))
+        self.send_timer = self.reactor.schedule(1, Timeout(self, "send"))
+        self.send_conn   = event.container.connect(self.send_addr)
+        self.recv_conn   = event.container.connect(self.recv_addr)
+        self.sender      = event.container.create_sender(self.send_conn, self.dest)
+        self.receiver    = event.container.create_receiver(self.recv_conn, self.dest)
+        self.receiver.flow ( self.n_expected )
+
+
+    #------------------------------------------------------------
+    # Sender Side
+    #------------------------------------------------------------
+
+    def send(self):
+        # Send little bursts of 3 messages every sender-timeout.
+        for _ in xrange(3) :
+            msg = Message(body=self.n_sent)
+            self.sender.send(msg)
+            self.n_sent += 1
+
+
+    #------------------------------------------------------------
+    # Receiver Side
+    #------------------------------------------------------------
+
+    def on_message(self, event):
+        if self.finishing :
+            return
+        self.n_received += 1
+        trace = event.message.annotations [ 'x-opt-qd.trace' ]
+        # Deliberate flaw for debugging.
+        # if self.n_received == 13 :
+        #     trace = [ '0/B', '0/A', '0/D' ]
+        self.record_trace ( trace )
+        if self.n_received == self.n_expected:
+            self.finishing = True
+            self.finish_test ( )
+
+
+    def record_trace ( self, observed_trace ):
+        for trace_record in self.expected_trace_counts :
+            trace = trace_record [ 0 ]
+            if observed_trace == trace :
+                trace_record [ 1 ] += 1
+                return
+        # If we get here, the trace is one we were not expecting. That's bad.
+        self.bail ( "Unexpected trace: %s" % observed_trace )
+
+
+    # Shut down everything and make sure that all of the extected traces
+    # have been seen.
+    def finish_test ( self ) :
+        self.test_timer.cancel()
+        self.send_timer.cancel()
+        for trace_record in self.expected_trace_counts :
+            count = trace_record[1]
+            # Deliberate flaw for debugging.
+            # count = 0
+            if count <= 0 :
+                self.bail ( "Trace %s was not seen." % trace_record[0] )
+                return
+
+        # success
+        self.bail ( None )
+
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org