You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/02/08 11:17:18 UTC

[ignite] branch ignite-ducktape updated: GNITE-13835 : Improve discovery ducktape test to research small timeouts and behavior on large cluster. (#8522)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 71cc2cf  GNITE-13835 : Improve discovery ducktape test to research small timeouts and behavior on large cluster. (#8522)
71cc2cf is described below

commit 71cc2cfdb001bd6c1d52176a72181ec9ca9d72af
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Mon Feb 8 14:16:50 2021 +0300

    GNITE-13835 : Improve discovery ducktape test to research small timeouts and behavior on large cluster. (#8522)
---
 .../ignitetest/services/utils/ignite_aware.py      |  16 +++
 .../services/utils/templates/discovery_macro.j2    |   4 +-
 .../tests/ignitetest/tests/discovery_test.py       | 123 ++++++++++-----------
 .../tests/ignitetest/utils/ignite_test.py          |   8 ++
 4 files changed, 84 insertions(+), 67 deletions(-)

diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index bc275ce..352b92f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -430,3 +430,19 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
         self.config = self.config._replace(ssl_context_factory=ssl_context_factory)
         self.config = self.config._replace(connector_configuration=ConnectorConfiguration(
             ssl_enabled=True, ssl_context_factory=ssl_context_factory))
+
+    @staticmethod
+    def exec_command(node, cmd):
+        """Executes the command passed on the given node and returns result as string."""
+        return str(node.account.ssh_client.exec_command(cmd)[1].read(), sys.getdefaultencoding())
+
+    @staticmethod
+    def node_id(node):
+        """
+        Returns node id from its log if started.
+        This is a remote call. Reuse its results if possible.
+        """
+        regexp = "^>>> Local node \\[ID=([^,]+),.+$"
+        cmd = "grep -E '%s' %s | sed -r 's/%s/\\1/'" % (regexp, node.log_file, regexp)
+
+        return IgniteAwareService.exec_command(node, cmd).strip().lower()
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
index 9ded542..8fa3168 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
@@ -53,8 +53,8 @@
         {% endif %}
         <property name="localPort" value="{{ spi.port }}"/>
         <property name="localPortRange" value="{{ spi.port_range }}"/>
-        {% if spi.connRecoveryTimeout is defined %}
-            <property name="connectionRecoveryTimeout" value="{{ spi.connRecoveryTimeout }}"/>
+        {% if spi.conn_recovery_timeout is defined %}
+            <property name="connectionRecoveryTimeout" value="{{ spi.conn_recovery_timeout }}"/>
         {% endif %}
         {{ ip_finder(spi) }}
     </bean>
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index f5b21d6..f4622c7 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -19,13 +19,11 @@ Module contains discovery tests.
 
 import os
 import random
-import sys
 from enum import IntEnum
 from time import monotonic
 from typing import NamedTuple
 
 from ducktape.mark import matrix
-
 from ignitetest.services.ignite import IgniteAwareService, IgniteService, get_event_time, node_failed_event_pattern
 from ignitetest.services.ignite_app import IgniteApplicationService
 from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
@@ -60,7 +58,6 @@ class DiscoveryTestConfig(NamedTuple):
     load_type: ClusterLoad = ClusterLoad.NONE
     sequential_failure: bool = False
     with_zk: bool = False
-    failure_detection_timeout: int = 1000
     disable_conn_recovery: bool = False
     net_part: IgniteAwareService.NetPart = IgniteService.NetPart.ALL
 
@@ -73,6 +70,11 @@ class DiscoveryTest(IgniteTest):
     2. Kill random node.
     3. Wait that survived node detects node failure.
     """
+
+    # Name of global param to control failure_detection_timeout used.
+    GLOBAL_DETECTION_TIMEOUT = "failure_detection_timeout"
+
+    # Default maximum numbers of containers to utilize for the test.
     MAX_CONTAINERS = 12
 
     ZOOKEEPER_NODES = 3
@@ -81,7 +83,7 @@ class DiscoveryTest(IgniteTest):
 
     WARMUP_DATA_AMOUNT = 10_000
 
-    FAILURE_TIMEOUT = 800
+    DEFAULT_DETECTION_TIMEOUT = 1000
 
     def __init__(self, test_context):
         super().__init__(test_context=test_context)
@@ -90,17 +92,16 @@ class DiscoveryTest(IgniteTest):
 
     @cluster(num_nodes=MAX_CONTAINERS)
     @ignite_versions(str(DEV_BRANCH), str(LATEST))
-    @matrix(nodes_to_kill=[1, 2], failure_detection_timeout=[FAILURE_TIMEOUT], disable_conn_recovery=[False, True],
+    @matrix(nodes_to_kill=[1, 2], disable_conn_recovery=[False, True],
             net_part=[IgniteService.NetPart.ALL, IgniteService.NetPart.INPUT],
             load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL])
-    def test_nodes_fail_not_sequential_tcp(self, ignite_version, nodes_to_kill, load_type, failure_detection_timeout,
-                                           disable_conn_recovery: bool, net_part: IgniteService.NetPart):
+    def test_nodes_fail_not_sequential_tcp(self, ignite_version, nodes_to_kill, load_type, disable_conn_recovery: bool,
+                                           net_part: IgniteService.NetPart):
         """
         Test nodes failure scenario with TcpDiscoverySpi not allowing nodes to fail in a row.
         """
         test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=nodes_to_kill,
                                           load_type=ClusterLoad.construct_from(load_type), sequential_failure=False,
-                                          failure_detection_timeout=failure_detection_timeout,
                                           disable_conn_recovery=disable_conn_recovery, net_part=net_part)
 
         return self._perform_node_fail_scenario(test_config)
@@ -108,16 +109,14 @@ class DiscoveryTest(IgniteTest):
     @cluster(num_nodes=MAX_CONTAINERS)
     @ignite_versions(str(DEV_BRANCH), str(LATEST))
     @matrix(load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL],
-            net_part=[IgniteService.NetPart.ALL, IgniteService.NetPart.INPUT],
-            failure_detection_timeout=[FAILURE_TIMEOUT], disable_conn_recovery=[False, True])
-    def test_2_nodes_fail_sequential_tcp(self, ignite_version, load_type, failure_detection_timeout,
-                                         disable_conn_recovery: bool, net_part: IgniteService.NetPart):
+            net_part=[IgniteService.NetPart.ALL, IgniteService.NetPart.INPUT], disable_conn_recovery=[False, True])
+    def test_2_nodes_fail_sequential_tcp(self, ignite_version, load_type, disable_conn_recovery: bool,
+                                         net_part: IgniteService.NetPart):
         """
         Test 2 nodes sequential failure scenario with TcpDiscoverySpi.
         """
         test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2,
                                           load_type=ClusterLoad.construct_from(load_type), sequential_failure=True,
-                                          failure_detection_timeout=failure_detection_timeout,
                                           disable_conn_recovery=disable_conn_recovery, net_part=net_part)
 
         return self._perform_node_fail_scenario(test_config)
@@ -125,50 +124,50 @@ class DiscoveryTest(IgniteTest):
     @cluster(num_nodes=MAX_CONTAINERS)
     @ignore_if(lambda version, globals: version == V_2_8_0)  # ignite-zookeeper package is broken in 2.8.0
     @ignite_versions(str(DEV_BRANCH), str(LATEST))
-    @matrix(nodes_to_kill=[1, 2], failure_detection_timeout=[FAILURE_TIMEOUT],
-            load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL])
-    def test_nodes_fail_not_sequential_zk(self, ignite_version, nodes_to_kill, load_type, failure_detection_timeout):
+    @matrix(nodes_to_kill=[1, 2], load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL])
+    def test_nodes_fail_not_sequential_zk(self, ignite_version, nodes_to_kill, load_type):
         """
         Test node failure scenario with ZooKeeperSpi not allowing nodes to fail in a row.
         """
         test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=nodes_to_kill,
                                           load_type=ClusterLoad.construct_from(load_type), sequential_failure=False,
-                                          with_zk=True, failure_detection_timeout=failure_detection_timeout)
+                                          with_zk=True)
 
         return self._perform_node_fail_scenario(test_config)
 
     @cluster(num_nodes=MAX_CONTAINERS)
     @ignore_if(lambda version, globals: version == V_2_8_0)  # ignite-zookeeper package is broken in 2.8.0
     @ignite_versions(str(DEV_BRANCH), str(LATEST))
-    @matrix(load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL],
-            failure_detection_timeout=[FAILURE_TIMEOUT])
-    def test_2_nodes_fail_sequential_zk(self, ignite_version, load_type, failure_detection_timeout):
+    @matrix(load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL])
+    def test_2_nodes_fail_sequential_zk(self, ignite_version, load_type):
         """
         Test node failure scenario with ZooKeeperSpi not allowing to fail nodes in a row.
         """
-        test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2,
-                                          load_type=ClusterLoad.construct_from(load_type), sequential_failure=True,
-                                          with_zk=True, failure_detection_timeout=failure_detection_timeout)
+        test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2, with_zk=True,
+                                          load_type=ClusterLoad.construct_from(load_type), sequential_failure=True)
 
         return self._perform_node_fail_scenario(test_config)
 
+    # pylint: disable=R0914
     def _perform_node_fail_scenario(self, test_config):
-        max_containers = len(self.test_context.cluster)
+        failure_detection_timeout = self._global_int(self.GLOBAL_DETECTION_TIMEOUT, self.DEFAULT_DETECTION_TIMEOUT)
+
+        cluster_size = len(self.test_context.cluster)
 
         # One node is required to detect the failure.
-        assert max_containers >= 1 + test_config.nodes_to_kill + (
-            DiscoveryTest.ZOOKEEPER_NODES if test_config.with_zk else 0) + (
-                   0 if test_config.load_type == ClusterLoad.NONE else 1), "Few required containers: " + \
-                                                                           str(max_containers) + ". Check the params."
+        assert cluster_size >= 1 + test_config.nodes_to_kill + (
+            self.ZOOKEEPER_NODES if test_config.with_zk else 0), \
+            f"Few required containers: {cluster_size}. Check the params."
 
-        self.logger.info("Starting on " + str(max_containers) + " maximal containers.")
+        self.logger.info("Starting on " + str(cluster_size) + " maximal containers.")
+        self.logger.info(f"{self.GLOBAL_DETECTION_TIMEOUT}: {failure_detection_timeout}")
 
         results = {}
 
         modules = ['zookeeper'] if test_config.with_zk else None
 
         if test_config.with_zk:
-            zk_quorum = start_zookeeper(self.test_context, DiscoveryTest.ZOOKEEPER_NODES, test_config)
+            zk_quorum = start_zookeeper(self.test_context, self.ZOOKEEPER_NODES, failure_detection_timeout)
 
             discovery_spi = from_zookeeper_cluster(zk_quorum)
         else:
@@ -178,12 +177,12 @@ class DiscoveryTest(IgniteTest):
                 discovery_spi.so_linger = 0
 
             if test_config.disable_conn_recovery:
-                discovery_spi.connRecoveryTimeout = 0
+                discovery_spi.conn_recovery_timeout = 0
 
         ignite_config = IgniteConfiguration(
             version=test_config.version,
             discovery_spi=discovery_spi,
-            failure_detection_timeout=test_config.failure_detection_timeout,
+            failure_detection_timeout=failure_detection_timeout,
             caches=[CacheConfiguration(
                 name='test-cache',
                 backups=1,
@@ -194,7 +193,8 @@ class DiscoveryTest(IgniteTest):
         # Start Ignite nodes in count less than max_nodes_in_use. One node is erequired for the loader. Some nodes might
         # be needed for ZooKeeper.
         servers, start_servers_sec = start_servers(
-            self.test_context, max_containers - DiscoveryTest.ZOOKEEPER_NODES - 1, ignite_config, modules)
+            self.test_context, cluster_size - self.ZOOKEEPER_NODES - 1,
+            ignite_config, modules)
 
         results['Ignite cluster start time (s)'] = start_servers_sec
 
@@ -204,7 +204,7 @@ class DiscoveryTest(IgniteTest):
             load_config = ignite_config._replace(client_mode=True) if test_config.with_zk else \
                 ignite_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(servers))
 
-            tran_nodes = [node_id(n) for n in failed_nodes] \
+            tran_nodes = [servers.node_id(n) for n in failed_nodes] \
                 if test_config.load_type == ClusterLoad.TRANSACTIONAL else None
 
             params = {"cacheName": "test-cache",
@@ -213,23 +213,27 @@ class DiscoveryTest(IgniteTest):
                       "targetNodes": tran_nodes,
                       "transactional": bool(tran_nodes)}
 
-            start_load_app(self.test_context, ignite_config=load_config, params=params, modules=modules)
+            start_load_app(self.test_context, load_config, params, modules)
 
-        results.update(self._simulate_and_detect_failure(servers, failed_nodes,
-                                                         test_config.failure_detection_timeout * 4,
+        # Detection timeout is 4 * failure_detection_timeout in seconds.
+        detection_timeout_sec = 4 * ignite_config.failure_detection_timeout // 1000
+
+        results.update(self._simulate_and_detect_failure(servers, failed_nodes, detection_timeout_sec,
                                                          test_config.net_part))
 
         return results
 
-    def _simulate_and_detect_failure(self, servers, failed_nodes, timeout, net_part: IgniteAwareService.NetPart):
+    def _simulate_and_detect_failure(self, servers, failed_nodes, event_timeout_sec,
+                                     net_part: IgniteAwareService.NetPart):
         """
         Perform node failure scenario
         """
+        ids_to_wait = []
+
         for node in failed_nodes:
-            self.logger.info(
-                "Simulating failure of node '%s' (order %d) on '%s'" % (node_id(node), order(node), node.name))
+            ids_to_wait.append(servers.node_id(node))
 
-        ids_to_wait = [node_id(n) for n in failed_nodes]
+            self.logger.info("Simulating failure of node '%s' (ID: %s)." % (node.name, ids_to_wait[-1]))
 
         _, first_terminated = servers.drop_network(failed_nodes, net_part=net_part)
 
@@ -237,12 +241,10 @@ class DiscoveryTest(IgniteTest):
         logged_timestamps = []
         data = {}
 
-        start = monotonic()
-
         for survivor in [n for n in servers.nodes if n not in failed_nodes]:
             for failed_id in ids_to_wait:
                 logged_timestamps.append(get_event_time(servers, survivor, node_failed_event_pattern(failed_id),
-                                                        timeout=start + timeout - monotonic()))
+                                                        timeout=event_timeout_sec))
 
             self._check_failed_number(failed_nodes, survivor)
 
@@ -261,15 +263,17 @@ class DiscoveryTest(IgniteTest):
         """Ensures number of failed nodes is correct."""
         cmd = "grep '%s' %s | wc -l" % (node_failed_event_pattern(), survived_node.log_file)
 
-        failed_cnt = int(str(survived_node.account.ssh_client.exec_command(cmd)[1].read(), sys.getdefaultencoding()))
+        failed_cnt = int(IgniteApplicationService.exec_command(survived_node, cmd))
+
+        # Cache survivor id, do not read each time.
+        surv_id = IgniteApplicationService.node_id(survived_node)
 
         if failed_cnt != len(failed_nodes):
-            failed = str(survived_node.account.ssh_client.exec_command(
-                "grep '%s' %s" % (node_failed_event_pattern(), survived_node.log_file))[1].read(),
-                         sys.getdefaultencoding())
+            failed = IgniteApplicationService.exec_command(survived_node, "grep '%s' %s" % (node_failed_event_pattern(),
+                                                                                            survived_node.log_file))
 
-            self.logger.warn("Node '%s' (%s) has detected the following failures:%s%s" % (
-                survived_node.name, node_id(survived_node), os.linesep, failed))
+            self.logger.warn("Node '%s' (%s) has detected the following failures:%s%s" %
+                             (survived_node.name, surv_id, os.linesep, failed))
 
             raise AssertionError(
                 "Wrong number of failed nodes: %d. Expected: %d. Check the logs." % (failed_cnt, len(failed_nodes)))
@@ -280,18 +284,18 @@ class DiscoveryTest(IgniteTest):
             for node in [srv_node for srv_node in service.nodes if srv_node not in failed_nodes]:
                 cmd = "grep -i '%s' %s | wc -l" % ("local node segmented", node.log_file)
 
-                failed = str(node.account.ssh_client.exec_command(cmd)[1].read(), sys.getdefaultencoding())
+                failed = IgniteApplicationService.exec_command(node, cmd)
 
                 if int(failed) > 0:
                     raise AssertionError(
                         "Wrong node failed (segmented) on '%s'. Check the logs." % node.name)
 
 
-def start_zookeeper(test_context, num_nodes, test_config):
+def start_zookeeper(test_context, num_nodes, failure_detection_timeout):
     """
     Start zookeeper cluster.
     """
-    zk_settings = ZookeeperSettings(min_session_timeout=test_config.failure_detection_timeout)
+    zk_settings = ZookeeperSettings(min_session_timeout=failure_detection_timeout)
 
     zk_quorum = ZookeeperService(test_context, num_nodes, settings=zk_settings)
     zk_quorum.start()
@@ -304,8 +308,7 @@ def start_servers(test_context, num_nodes, ignite_config, modules=None):
     """
     servers = IgniteService(test_context, config=ignite_config, num_nodes=num_nodes, modules=modules,
                             # mute spam in log.
-                            jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"],
-                            startup_timeout_sec=100)
+                            jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"])
 
     start = monotonic()
     servers.start()
@@ -343,13 +346,3 @@ def choose_node_to_kill(servers, nodes_to_kill, sequential):
     assert len(to_kill) == nodes_to_kill, "Unable to pick up required number of nodes to kill."
 
     return to_kill
-
-
-def order(node):
-    """Return discovery order of the node."""
-    return node.discovery_info().order
-
-
-def node_id(node):
-    """Return node id."""
-    return node.discovery_info().node_id
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index 677141c..5c9fcad 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -55,3 +55,11 @@ class IgniteTest(Test):
         self.logger.debug("All services killed.")
 
         super().tearDown()
+
+    def _global_param(self, param_name, default=None):
+        """Reads global parameter passed to the test suite."""
+        return self.test_context.globals.get(param_name, default)
+
+    def _global_int(self, param_name, default: int = None):
+        """Reads global parameter passed to the test suite and converts to int."""
+        return int(self._global_param(param_name, default))