You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2017/11/29 17:34:45 UTC

qpid-dispatch git commit: DISPATCH-209 : first topology test

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master e494da0a9 -> 58ecc97bc


DISPATCH-209 : first topology test


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/58ecc97b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/58ecc97b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/58ecc97b

Branch: refs/heads/master
Commit: 58ecc97bcc3d9f89f62d308d3c9b01027151f5e3
Parents: e494da0
Author: mgoulish <mg...@redhat.com>
Authored: Wed Nov 29 11:49:59 2017 -0500
Committer: mgoulish <mg...@redhat.com>
Committed: Wed Nov 29 11:49:59 2017 -0500

----------------------------------------------------------------------
 tests/CMakeLists.txt           |   1 +
 tests/system_tests_topology.py | 609 ++++++++++++++++++++++++++++++++++++
 2 files changed, 610 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58ecc97b/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index aaf65ec..0c6454c 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -100,6 +100,7 @@ foreach(py_test_module
     system_tests_denied_unsettled_multicast
     system_tests_auth_service_plugin
     system_tests_delivery_abort
+    system_tests_topology
     ${SYSTEM_TESTS_HTTP}
     )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58ecc97b/tests/system_tests_topology.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_topology.py b/tests/system_tests_topology.py
new file mode 100644
index 0000000..5e58483
--- /dev/null
+++ b/tests/system_tests_topology.py
@@ -0,0 +1,609 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess      import PIPE, STDOUT
+from proton          import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test     import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor  import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
+from proton.utils    import BlockingConnection
+from qpid_dispatch.management.client import Node
+
+import time
+import datetime
+import pdb
+
+
+
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+
+
+#------------------------------------------------
+# Helper classes for all tests.
+#------------------------------------------------
+
+class Timeout(object):
+    """
+    Named timeout object can handle multiple simultaneous
+    timers, by telling the parent which one fired.
+    """
+    def __init__ ( self, parent, name ):
+        self.parent = parent
+        self.name   = name
+
+    def on_timer_task ( self, event ):
+        self.parent.timeout ( self.name )
+
+
+
+class ManagementMessageHelper ( object ):
+    """
+    Format management messages.
+    """
+    def __init__ ( self, reply_addr ):
+        self.reply_addr = reply_addr
+
+    def make_connector_query ( self, connector_name ):
+        props = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name }
+        msg = Message ( properties=props, reply_to=self.reply_addr )
+        return msg
+
+    def make_connector_delete_command ( self, connector_name ):
+        props = {'operation': 'DELETE', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name }
+        msg = Message ( properties=props, reply_to=self.reply_addr )
+        return msg
+
+
+#------------------------------------------------
+# END Helper classes for all tests.
+#------------------------------------------------
+
+
+
+
+
+#================================================================
+#     Setup
+#================================================================
+
+class TopologyTests ( TestCase ):
+
+    @classmethod
+    def setUpClass(cls):
+        super(TopologyTests, cls).setUpClass()
+
+
+
+        def router(name, more_config):
+
+            config = [ ('router',  {'mode': 'interior', 'id': name}),
+                       ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+                       ('address', {'prefix': 'balanced',  'distribution': 'balanced'}),
+                       ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+                     ]    \
+                     + more_config
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        A_client_port = cls.tester.get_port()
+        B_client_port = cls.tester.get_port()
+        C_client_port = cls.tester.get_port()
+        D_client_port = cls.tester.get_port()
+
+        A_inter_router_port = cls.tester.get_port()
+        B_inter_router_port = cls.tester.get_port()
+        C_inter_router_port = cls.tester.get_port()
+
+        #
+        #
+        #  Topology of the 4-mesh, with costs of connections marked.
+        #  Tail of arrow indicates initiator of connection.
+        #
+        #                1
+        #         D ----------> A
+        #         | \         > ^
+        #         | 20\   50/   |
+        #         |     \ /     |
+        #      1  |     / \     | 100
+        #         |   /     \   |
+        #         v /         > |
+        #         C ----------> B
+        #                1
+        #
+        #  Test 1 TopologyFailover Notes
+        #
+        #       1. Messages are always sent from A, and go to B.
+        #       2. First route ahould be ADCB.
+        #       3. Then we kill connector CD.
+        #       4. Next route should be ADB.
+        #       5. Then we kill connector BD.
+        #       6. Next route should be ACB.
+        #       7. Then we kill connector BC.
+        #       8. Final route should be AB.
+
+        cls.A_B_cost =  100
+        cls.A_C_cost =   50
+        cls.A_D_cost =    1
+        cls.B_C_cost =    1
+        cls.B_D_cost =   20
+        cls.C_D_cost =    1
+
+        router ( 'A',
+                 [
+                    ( 'listener',
+                      { 'port': A_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                      { 'role': 'inter-router',
+                        'port': A_inter_router_port,
+                        'stripAnnotations': 'no'
+                      }
+                    )
+                 ]
+               )
+
+
+        router ( 'B',
+                 [
+                    ( 'listener',
+                      { 'port': B_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                      { 'role': 'inter-router',
+                        'port': B_inter_router_port,
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'AB_connector',
+                         'role': 'inter-router',
+                         'port': A_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost':  cls.A_B_cost,
+                         'stripAnnotations': 'no'
+                      }
+                    )
+                 ]
+               )
+
+
+        router ( 'C',
+                 [
+                    ( 'listener',
+                      { 'port': C_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                      { 'role': 'inter-router',
+                        'port': C_inter_router_port,
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'AC_connector',
+                         'role': 'inter-router',
+                         'port': A_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.A_C_cost,
+                         'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'BC_connector',
+                         'role': 'inter-router',
+                         'port': B_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.B_C_cost,
+                         'stripAnnotations': 'no'
+                      }
+                    )
+                 ]
+               )
+
+
+        router ( 'D',
+                 [
+                    ( 'listener',
+                      { 'port': D_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'AD_connector',
+                         'role': 'inter-router',
+                         'port': A_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.A_D_cost,
+                         'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'BD_connector',
+                         'role': 'inter-router',
+                         'port': B_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.B_D_cost,
+                         'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'CD_connector',
+                         'role': 'inter-router',
+                         'port': C_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.C_D_cost,
+                         'stripAnnotations': 'no'
+                      }
+                    )
+                 ]
+               )
+
+
+        router_A = cls.routers[0]
+        router_B = cls.routers[1]
+        router_C = cls.routers[2]
+        router_D = cls.routers[3]
+
+        router_A.wait_router_connected('B')
+        router_A.wait_router_connected('C')
+        router_A.wait_router_connected('D')
+
+        cls.client_addrs = ( router_A.addresses[0],
+                             router_B.addresses[0],
+                             router_C.addresses[0],
+                             router_D.addresses[0]
+                           )
+
+        # 1 means skip that test.
+        cls.skip = { 'test_01' : 0
+                   }
+
+
+
+    def test_01_topology_failover ( self ):
+        name = 'test_01'
+        if self.skip [ name ] :
+            self.skipTest ( "Test skipped during development." )
+        test = TopologyFailover ( name,
+                                  self.client_addrs,
+                                  "closest/01"
+                                )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+
+
+#================================================================
+#     Tests
+#================================================================
+
+
+# Also see 'Test 1 TopologyFailover Notes', above.
+
+class TopologyFailover ( MessagingHandler ):
+    """
+    Test that the lowest-cost route is always chosen in a 4-mesh
+    network topology, as one link after another is lost.
+
+    This test also ensures that connections that have been
+    deliberately severed do no get restored.
+    """
+    def __init__ ( self, test_name, client_addrs, destination ):
+        super(TopologyFailover, self).__init__(prefetch=0)
+        self.client_addrs     = client_addrs
+        self.dest       = destination
+        self.error      = None
+        self.sender     = None
+        self.receiver   = None
+        self.test_timer = None
+        self.send_timer = None
+        self.n_sent     = 0
+        self.n_received = 0
+        self.n_accepted = 0
+        self.n_released = 0
+        self.reactor    = None
+        self.state      = None
+        self.send_conn  = None
+        self.recv_conn  = None
+        self.nap_time   = 2
+        self.debug      = False
+
+        # Holds the management sender, receiver, and 'helper'
+        # associated with each router.
+        self.routers = {
+                         'A' : dict(),
+                         'B' : dict(),
+                         'C' : dict(),
+                         'D' : dict()
+                       }
+
+        # These are the expectes routing traces, in the order we
+        # expect to receive them.
+        self.expected_traces = [
+                                 [u'0/A', u'0/D', u'0/C', u'0/B'],
+                                 [u'0/A', u'0/D', u'0/B'],
+                                 [u'0/A', u'0/C', u'0/B'],
+                                 [u'0/A', u'0/B']
+                               ]
+        self.trace_count    = 0
+
+        # This tells the system in what order to kill the connectors.
+        self.kill_list = (
+                           ( 'D', 'CD_connector' ),
+                           ( 'D', 'BD_connector' ),
+                           ( 'C', 'BC_connector' )
+                         )
+
+        # Use this to keep track of which connectors we have found
+        # when the test is first getting started and we are checking
+        # the topology.
+        self.connectors_map = { 'AB_connector' : 0,
+                                'AC_connector' : 0,
+                                'AD_connector' : 0,
+                                'BC_connector' : 0,
+                                'BD_connector' : 0,
+                                'CD_connector' : 0
+                              }
+
+
+    # The simple state machine transitions when certain events happen,
+    # if certain conditions are met.  The conditions are checked for
+    # by the callbacks for the events.
+    # The normal sequence of states in the state machine is:
+    #  1. starting        -- doesn't do anything
+    #  2. checking        -- checks initial topology
+    #  3. examine_trace   -- look at routing trace of first message
+    #  4. kill_connector  -- kills the first connector (CD)
+    #  5. examine_trace   -- checks routing trace of next message
+    #  5. kill_connector  -- kills the next connector  (BD)
+    #  5. examine_trace   -- checks routing trace of next message
+    #  5. kill_connector  -- kills the next connector  (BC)
+    #  5. examine_trace   -- checks routing trace of final message
+    #  5. bailing         -- bails out with success
+
+
+    def state_transition ( self, message, new_state ) :
+        if self.state == new_state :
+            return
+        self.state = new_state
+        self.debug_print ( "state transition to : %s -- because %s" % ( self.state, message ) )
+
+
+    def debug_print ( self, text ) :
+        if self.debug == True:
+            print time.time(), text
+
+
+    # Shut down everything and exit.
+    def bail ( self, text ):
+        self.error = text
+
+        self.send_conn.close ( )
+        self.recv_conn.close ( )
+
+        self.routers['B'] ['mgmt_conn'].close()
+        self.routers['C'] ['mgmt_conn'].close()
+        self.routers['D'] ['mgmt_conn'].close()
+
+        self.test_timer.cancel ( )
+        self.send_timer.cancel ( )
+
+
+    #------------------------------------------------------------------------
+    # I want some behavior from this test that is a little too complex
+    # to be governed by the usual callback functions. The way I do this
+    # is by making a simple state machine that checks some conditions
+    # during some callback, and then either steps forward or terminates
+    # the test.
+    # The callbacks that activate the state machine are mostly on_message,
+    # or timeout.  But there are two different timers: the one-second
+    # timer that mostly runs the test, and the 60-second timer that, if it
+    # fires, will terminate the test with a timeout error.
+    #------------------------------------------------------------------------
+    def timeout ( self, name ):
+        if name == 'test':
+            self.set_state ( 'Timeout Expired', 'bailing' )
+            self.bail ( "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \
+                         ( self.n_sent, self.n_received, self.n_accepted ) )
+        elif name == 'sender':
+            if self.state == 'examine_trace' :
+                self.send ( )
+            self.send_timer = self.reactor.schedule(1, Timeout(self, "sender"))
+
+
+    def on_start ( self, event ):
+        self.state_transition ( 'on_start', 'starting' )
+        self.reactor = event.reactor
+        self.test_timer = event.reactor.schedule ( 60, Timeout(self, "test") )
+        self.send_timer = event.reactor.schedule ( 1, Timeout(self, "sender") )
+        self.send_conn  = event.container.connect ( self.client_addrs[0] ) # A
+        self.recv_conn  = event.container.connect ( self.client_addrs[1] ) # B
+
+        self.sender     = event.container.create_sender   ( self.send_conn, self.dest )
+        self.receiver   = event.container.create_receiver ( self.recv_conn, self.dest )
+        self.receiver.flow ( 100 )
+
+        # I will only send management messages to B, C, and D, because
+        # they are the owners of the connections that I will want to delete.
+        self.routers['B'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[1] )
+        self.routers['C'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[2] )
+        self.routers['D'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[3] )
+
+        self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['B'] ['mgmt_conn'], dynamic=True )
+        self.routers['C'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['C'] ['mgmt_conn'], dynamic=True )
+        self.routers['D'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['D'] ['mgmt_conn'], dynamic=True )
+
+        self.routers['B'] ['mgmt_sender']   = event.container.create_sender   ( self.routers['B'] ['mgmt_conn'], "$management" )
+        self.routers['C'] ['mgmt_sender']   = event.container.create_sender   ( self.routers['C'] ['mgmt_conn'], "$management" )
+        self.routers['D'] ['mgmt_sender']   = event.container.create_sender   ( self.routers['D'] ['mgmt_conn'], "$management" )
+
+
+
+    #-----------------------------------------------------------------
+    # At start-time, as the links to the three managed routers
+    # open, check each one to make sure that it has all the expected
+    # connections.
+    #-----------------------------------------------------------------
+    def on_link_opened ( self, event ) :
+        self.state_transition ( 'on_link_opened', 'checking' )
+        # The B mgmt link has opened. Check its connections. --------------------------
+        if event.receiver == self.routers['B'] ['mgmt_receiver'] :
+            event.receiver.flow ( 1000 )
+            self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
+            for connector in [ 'AB_connector' ] :
+                self.connector_check ( 'B', connector )
+        # The C mgmt link has opened. Check its connections. --------------------------
+        elif event.receiver == self.routers['C'] ['mgmt_receiver'] :
+            event.receiver.flow ( 1000 )
+            self.routers['C'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
+            for connector in [ 'AC_connector', 'BC_connector' ] :
+                self.connector_check ( 'C', connector )
+        # The D mgmt link has opened. Check its connections. --------------------------
+        elif event.receiver == self.routers['D'] ['mgmt_receiver']:
+            event.receiver.flow ( 1000 )
+            self.routers['D'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address )
+            for connector in [ 'AD_connector', 'BD_connector', 'CD_connector' ] :
+                self.connector_check ( 'D', connector )
+
+
+    def send ( self ):
+        n_sent_this_time = 0
+        if self.sender.credit <= 0:
+          self.receiver.flow ( 100 )
+          return
+        # Send messages one at a time.
+        if self.sender.credit > 0 :
+            msg = Message ( body=self.n_sent )
+            self.sender.send ( msg )
+            n_sent_this_time += 1
+            self.n_sent += 1
+        self.debug_print ( "sent: %d" % self.n_sent )
+
+
+    def on_message ( self, event ):
+
+        if event.receiver == self.routers['B'] ['mgmt_receiver'] or \
+           event.receiver == self.routers['C'] ['mgmt_receiver'] or \
+           event.receiver == self.routers['D'] ['mgmt_receiver'] :
+
+            #----------------------------------------------------------------
+            # This is a management message.
+            #----------------------------------------------------------------
+            if self.state == 'checking' :
+                connection_name = event.message.body['name']
+
+                if connection_name in self.connectors_map :
+                    self.connectors_map [ connection_name ] = 1
+                else :
+                  self.state_transition ( "bad connection name: %s" % connection_name, 'bailing' )
+                  self.bail ( "bad connection name: %s" % connection_name )
+
+                n_connections = sum(self.connectors_map.values())
+                if n_connections == 6 :
+                  self.state_transition ( "all %d connections found" % n_connections, 'examine_trace' )
+            elif self.state == 'kill_connector' :
+                if event.message.properties["statusDescription"] == 'No Content':
+                    # We are in the process of killing a connector, and
+                    # have received the response to the kill message.
+                    self.state_transition ( 'got kill response', 'examine_trace' )
+                    # This sleep is here because one early bug that this test found
+                    # (and which is now fixed) involved connections that had been 
+                    # deleted coming back sometimes. It was a race and only happened 
+                    # very occasionally -- but with a pause here, after getting 
+                    # confirmation that we have successfully deleted the connector, 
+                    # the bug would show up 60 to 75% of the time. I think that leaving
+                    # this sleep here is the only way to ensure that that particular
+                    # bug stays fixed.
+                    time.sleep ( self.nap_time )
+        else:
+            #----------------------------------------------------------------
+            # This is a payload message.
+            #----------------------------------------------------------------
+            self.n_received += 1
+            if self.state == 'examine_trace' :
+                trace    = event.message.annotations [ 'x-opt-qd.trace' ]
+                expected = self.expected_traces [ self.trace_count ]
+                if trace == expected :
+                    if self.trace_count == len(self.expected_traces) - 1 :
+                        self.state_transition ( 'final expected trace %s observed' % expected, 'bailing' )
+                        self.bail ( None )
+                        return
+                    self.state_transition ( "expected trace %d observed successfully %s" % ( self.trace_count, expected ) , 'kill_connector' )
+                    self.kill_a_connector ( self.kill_list[self.trace_count] )
+                    self.trace_count += 1
+                else :
+                    self.state_transition ( "expected trace %s but got %s" % ( expected, trace ), 'bailing' )
+                    self.bail ( "expected trace %s but got %s" % ( expected, trace ) )
+
+
+    def on_accepted ( self, event ):
+        self.n_accepted += 1
+
+
+    def on_released ( self, event ) :
+        self.n_released += 1
+
+
+    def connector_check ( self, router, connector ) :
+        self.debug_print ( "checking connector for router %s" % router )
+        mgmt_helper = self.routers[router] ['mgmt_helper']
+        mgmt_sender = self.routers[router] ['mgmt_sender']
+        msg = mgmt_helper.make_connector_query ( connector )
+        mgmt_sender.send ( msg )
+
+
+    def kill_a_connector ( self, target ) :
+        router = target[0]
+        connector = target[1]
+        self.debug_print ( "killing connector %s on router %s" % (connector, router) )
+        mgmt_helper = self.routers[router] ['mgmt_helper']
+        mgmt_sender = self.routers[router] ['mgmt_sender']
+        msg = mgmt_helper.make_connector_delete_command ( connector )
+        mgmt_sender.send ( msg )
+
+
+
+    def run(self):
+        Container(self).run()
+
+
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org