You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/02/08 11:17:18 UTC
[ignite] branch ignite-ducktape updated: GNITE-13835 : Improve
discovery ducktape test to research small timeouts and behavior on large
cluster. (#8522)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new 71cc2cf GNITE-13835 : Improve discovery ducktape test to research small timeouts and behavior on large cluster. (#8522)
71cc2cf is described below
commit 71cc2cfdb001bd6c1d52176a72181ec9ca9d72af
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Mon Feb 8 14:16:50 2021 +0300
GNITE-13835 : Improve discovery ducktape test to research small timeouts and behavior on large cluster. (#8522)
---
.../ignitetest/services/utils/ignite_aware.py | 16 +++
.../services/utils/templates/discovery_macro.j2 | 4 +-
.../tests/ignitetest/tests/discovery_test.py | 123 ++++++++++-----------
.../tests/ignitetest/utils/ignite_test.py | 8 ++
4 files changed, 84 insertions(+), 67 deletions(-)
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index bc275ce..352b92f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -430,3 +430,19 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
self.config = self.config._replace(ssl_context_factory=ssl_context_factory)
self.config = self.config._replace(connector_configuration=ConnectorConfiguration(
ssl_enabled=True, ssl_context_factory=ssl_context_factory))
+
+ @staticmethod
+ def exec_command(node, cmd):
+ """Executes the command passed on the given node and returns result as string."""
+ return str(node.account.ssh_client.exec_command(cmd)[1].read(), sys.getdefaultencoding())
+
+ @staticmethod
+ def node_id(node):
+ """
+ Returns node id from its log if started.
+ This is a remote call. Reuse its results if possible.
+ """
+ regexp = "^>>> Local node \\[ID=([^,]+),.+$"
+ cmd = "grep -E '%s' %s | sed -r 's/%s/\\1/'" % (regexp, node.log_file, regexp)
+
+ return IgniteAwareService.exec_command(node, cmd).strip().lower()
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
index 9ded542..8fa3168 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
@@ -53,8 +53,8 @@
{% endif %}
<property name="localPort" value="{{ spi.port }}"/>
<property name="localPortRange" value="{{ spi.port_range }}"/>
- {% if spi.connRecoveryTimeout is defined %}
- <property name="connectionRecoveryTimeout" value="{{ spi.connRecoveryTimeout }}"/>
+ {% if spi.conn_recovery_timeout is defined %}
+ <property name="connectionRecoveryTimeout" value="{{ spi.conn_recovery_timeout }}"/>
{% endif %}
{{ ip_finder(spi) }}
</bean>
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index f5b21d6..f4622c7 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -19,13 +19,11 @@ Module contains discovery tests.
import os
import random
-import sys
from enum import IntEnum
from time import monotonic
from typing import NamedTuple
from ducktape.mark import matrix
-
from ignitetest.services.ignite import IgniteAwareService, IgniteService, get_event_time, node_failed_event_pattern
from ignitetest.services.ignite_app import IgniteApplicationService
from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
@@ -60,7 +58,6 @@ class DiscoveryTestConfig(NamedTuple):
load_type: ClusterLoad = ClusterLoad.NONE
sequential_failure: bool = False
with_zk: bool = False
- failure_detection_timeout: int = 1000
disable_conn_recovery: bool = False
net_part: IgniteAwareService.NetPart = IgniteService.NetPart.ALL
@@ -73,6 +70,11 @@ class DiscoveryTest(IgniteTest):
2. Kill random node.
3. Wait that survived node detects node failure.
"""
+
+ # Name of global param to control failure_detection_timeout used.
+ GLOBAL_DETECTION_TIMEOUT = "failure_detection_timeout"
+
+ # Default maximum numbers of containers to utilize for the test.
MAX_CONTAINERS = 12
ZOOKEEPER_NODES = 3
@@ -81,7 +83,7 @@ class DiscoveryTest(IgniteTest):
WARMUP_DATA_AMOUNT = 10_000
- FAILURE_TIMEOUT = 800
+ DEFAULT_DETECTION_TIMEOUT = 1000
def __init__(self, test_context):
super().__init__(test_context=test_context)
@@ -90,17 +92,16 @@ class DiscoveryTest(IgniteTest):
@cluster(num_nodes=MAX_CONTAINERS)
@ignite_versions(str(DEV_BRANCH), str(LATEST))
- @matrix(nodes_to_kill=[1, 2], failure_detection_timeout=[FAILURE_TIMEOUT], disable_conn_recovery=[False, True],
+ @matrix(nodes_to_kill=[1, 2], disable_conn_recovery=[False, True],
net_part=[IgniteService.NetPart.ALL, IgniteService.NetPart.INPUT],
load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL])
- def test_nodes_fail_not_sequential_tcp(self, ignite_version, nodes_to_kill, load_type, failure_detection_timeout,
- disable_conn_recovery: bool, net_part: IgniteService.NetPart):
+ def test_nodes_fail_not_sequential_tcp(self, ignite_version, nodes_to_kill, load_type, disable_conn_recovery: bool,
+ net_part: IgniteService.NetPart):
"""
Test nodes failure scenario with TcpDiscoverySpi not allowing nodes to fail in a row.
"""
test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=nodes_to_kill,
load_type=ClusterLoad.construct_from(load_type), sequential_failure=False,
- failure_detection_timeout=failure_detection_timeout,
disable_conn_recovery=disable_conn_recovery, net_part=net_part)
return self._perform_node_fail_scenario(test_config)
@@ -108,16 +109,14 @@ class DiscoveryTest(IgniteTest):
@cluster(num_nodes=MAX_CONTAINERS)
@ignite_versions(str(DEV_BRANCH), str(LATEST))
@matrix(load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL],
- net_part=[IgniteService.NetPart.ALL, IgniteService.NetPart.INPUT],
- failure_detection_timeout=[FAILURE_TIMEOUT], disable_conn_recovery=[False, True])
- def test_2_nodes_fail_sequential_tcp(self, ignite_version, load_type, failure_detection_timeout,
- disable_conn_recovery: bool, net_part: IgniteService.NetPart):
+ net_part=[IgniteService.NetPart.ALL, IgniteService.NetPart.INPUT], disable_conn_recovery=[False, True])
+ def test_2_nodes_fail_sequential_tcp(self, ignite_version, load_type, disable_conn_recovery: bool,
+ net_part: IgniteService.NetPart):
"""
Test 2 nodes sequential failure scenario with TcpDiscoverySpi.
"""
test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2,
load_type=ClusterLoad.construct_from(load_type), sequential_failure=True,
- failure_detection_timeout=failure_detection_timeout,
disable_conn_recovery=disable_conn_recovery, net_part=net_part)
return self._perform_node_fail_scenario(test_config)
@@ -125,50 +124,50 @@ class DiscoveryTest(IgniteTest):
@cluster(num_nodes=MAX_CONTAINERS)
@ignore_if(lambda version, globals: version == V_2_8_0) # ignite-zookeeper package is broken in 2.8.0
@ignite_versions(str(DEV_BRANCH), str(LATEST))
- @matrix(nodes_to_kill=[1, 2], failure_detection_timeout=[FAILURE_TIMEOUT],
- load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL])
- def test_nodes_fail_not_sequential_zk(self, ignite_version, nodes_to_kill, load_type, failure_detection_timeout):
+ @matrix(nodes_to_kill=[1, 2], load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL])
+ def test_nodes_fail_not_sequential_zk(self, ignite_version, nodes_to_kill, load_type):
"""
Test node failure scenario with ZooKeeperSpi not allowing nodes to fail in a row.
"""
test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=nodes_to_kill,
load_type=ClusterLoad.construct_from(load_type), sequential_failure=False,
- with_zk=True, failure_detection_timeout=failure_detection_timeout)
+ with_zk=True)
return self._perform_node_fail_scenario(test_config)
@cluster(num_nodes=MAX_CONTAINERS)
@ignore_if(lambda version, globals: version == V_2_8_0) # ignite-zookeeper package is broken in 2.8.0
@ignite_versions(str(DEV_BRANCH), str(LATEST))
- @matrix(load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL],
- failure_detection_timeout=[FAILURE_TIMEOUT])
- def test_2_nodes_fail_sequential_zk(self, ignite_version, load_type, failure_detection_timeout):
+ @matrix(load_type=[ClusterLoad.ATOMIC, ClusterLoad.TRANSACTIONAL])
+ def test_2_nodes_fail_sequential_zk(self, ignite_version, load_type):
"""
Test node failure scenario with ZooKeeperSpi not allowing to fail nodes in a row.
"""
- test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2,
- load_type=ClusterLoad.construct_from(load_type), sequential_failure=True,
- with_zk=True, failure_detection_timeout=failure_detection_timeout)
+ test_config = DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2, with_zk=True,
+ load_type=ClusterLoad.construct_from(load_type), sequential_failure=True)
return self._perform_node_fail_scenario(test_config)
+ # pylint: disable=R0914
def _perform_node_fail_scenario(self, test_config):
- max_containers = len(self.test_context.cluster)
+ failure_detection_timeout = self._global_int(self.GLOBAL_DETECTION_TIMEOUT, self.DEFAULT_DETECTION_TIMEOUT)
+
+ cluster_size = len(self.test_context.cluster)
# One node is required to detect the failure.
- assert max_containers >= 1 + test_config.nodes_to_kill + (
- DiscoveryTest.ZOOKEEPER_NODES if test_config.with_zk else 0) + (
- 0 if test_config.load_type == ClusterLoad.NONE else 1), "Few required containers: " + \
- str(max_containers) + ". Check the params."
+ assert cluster_size >= 1 + test_config.nodes_to_kill + (
+ self.ZOOKEEPER_NODES if test_config.with_zk else 0), \
+ f"Few required containers: {cluster_size}. Check the params."
- self.logger.info("Starting on " + str(max_containers) + " maximal containers.")
+ self.logger.info("Starting on " + str(cluster_size) + " maximal containers.")
+ self.logger.info(f"{self.GLOBAL_DETECTION_TIMEOUT}: {failure_detection_timeout}")
results = {}
modules = ['zookeeper'] if test_config.with_zk else None
if test_config.with_zk:
- zk_quorum = start_zookeeper(self.test_context, DiscoveryTest.ZOOKEEPER_NODES, test_config)
+ zk_quorum = start_zookeeper(self.test_context, self.ZOOKEEPER_NODES, failure_detection_timeout)
discovery_spi = from_zookeeper_cluster(zk_quorum)
else:
@@ -178,12 +177,12 @@ class DiscoveryTest(IgniteTest):
discovery_spi.so_linger = 0
if test_config.disable_conn_recovery:
- discovery_spi.connRecoveryTimeout = 0
+ discovery_spi.conn_recovery_timeout = 0
ignite_config = IgniteConfiguration(
version=test_config.version,
discovery_spi=discovery_spi,
- failure_detection_timeout=test_config.failure_detection_timeout,
+ failure_detection_timeout=failure_detection_timeout,
caches=[CacheConfiguration(
name='test-cache',
backups=1,
@@ -194,7 +193,8 @@ class DiscoveryTest(IgniteTest):
# Start Ignite nodes in count less than max_nodes_in_use. One node is erequired for the loader. Some nodes might
# be needed for ZooKeeper.
servers, start_servers_sec = start_servers(
- self.test_context, max_containers - DiscoveryTest.ZOOKEEPER_NODES - 1, ignite_config, modules)
+ self.test_context, cluster_size - self.ZOOKEEPER_NODES - 1,
+ ignite_config, modules)
results['Ignite cluster start time (s)'] = start_servers_sec
@@ -204,7 +204,7 @@ class DiscoveryTest(IgniteTest):
load_config = ignite_config._replace(client_mode=True) if test_config.with_zk else \
ignite_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(servers))
- tran_nodes = [node_id(n) for n in failed_nodes] \
+ tran_nodes = [servers.node_id(n) for n in failed_nodes] \
if test_config.load_type == ClusterLoad.TRANSACTIONAL else None
params = {"cacheName": "test-cache",
@@ -213,23 +213,27 @@ class DiscoveryTest(IgniteTest):
"targetNodes": tran_nodes,
"transactional": bool(tran_nodes)}
- start_load_app(self.test_context, ignite_config=load_config, params=params, modules=modules)
+ start_load_app(self.test_context, load_config, params, modules)
- results.update(self._simulate_and_detect_failure(servers, failed_nodes,
- test_config.failure_detection_timeout * 4,
+ # Detection timeout is 4 * failure_detection_timeout in seconds.
+ detection_timeout_sec = 4 * ignite_config.failure_detection_timeout // 1000
+
+ results.update(self._simulate_and_detect_failure(servers, failed_nodes, detection_timeout_sec,
test_config.net_part))
return results
- def _simulate_and_detect_failure(self, servers, failed_nodes, timeout, net_part: IgniteAwareService.NetPart):
+ def _simulate_and_detect_failure(self, servers, failed_nodes, event_timeout_sec,
+ net_part: IgniteAwareService.NetPart):
"""
Perform node failure scenario
"""
+ ids_to_wait = []
+
for node in failed_nodes:
- self.logger.info(
- "Simulating failure of node '%s' (order %d) on '%s'" % (node_id(node), order(node), node.name))
+ ids_to_wait.append(servers.node_id(node))
- ids_to_wait = [node_id(n) for n in failed_nodes]
+ self.logger.info("Simulating failure of node '%s' (ID: %s)." % (node.name, ids_to_wait[-1]))
_, first_terminated = servers.drop_network(failed_nodes, net_part=net_part)
@@ -237,12 +241,10 @@ class DiscoveryTest(IgniteTest):
logged_timestamps = []
data = {}
- start = monotonic()
-
for survivor in [n for n in servers.nodes if n not in failed_nodes]:
for failed_id in ids_to_wait:
logged_timestamps.append(get_event_time(servers, survivor, node_failed_event_pattern(failed_id),
- timeout=start + timeout - monotonic()))
+ timeout=event_timeout_sec))
self._check_failed_number(failed_nodes, survivor)
@@ -261,15 +263,17 @@ class DiscoveryTest(IgniteTest):
"""Ensures number of failed nodes is correct."""
cmd = "grep '%s' %s | wc -l" % (node_failed_event_pattern(), survived_node.log_file)
- failed_cnt = int(str(survived_node.account.ssh_client.exec_command(cmd)[1].read(), sys.getdefaultencoding()))
+ failed_cnt = int(IgniteApplicationService.exec_command(survived_node, cmd))
+
+ # Cache survivor id, do not read each time.
+ surv_id = IgniteApplicationService.node_id(survived_node)
if failed_cnt != len(failed_nodes):
- failed = str(survived_node.account.ssh_client.exec_command(
- "grep '%s' %s" % (node_failed_event_pattern(), survived_node.log_file))[1].read(),
- sys.getdefaultencoding())
+ failed = IgniteApplicationService.exec_command(survived_node, "grep '%s' %s" % (node_failed_event_pattern(),
+ survived_node.log_file))
- self.logger.warn("Node '%s' (%s) has detected the following failures:%s%s" % (
- survived_node.name, node_id(survived_node), os.linesep, failed))
+ self.logger.warn("Node '%s' (%s) has detected the following failures:%s%s" %
+ (survived_node.name, surv_id, os.linesep, failed))
raise AssertionError(
"Wrong number of failed nodes: %d. Expected: %d. Check the logs." % (failed_cnt, len(failed_nodes)))
@@ -280,18 +284,18 @@ class DiscoveryTest(IgniteTest):
for node in [srv_node for srv_node in service.nodes if srv_node not in failed_nodes]:
cmd = "grep -i '%s' %s | wc -l" % ("local node segmented", node.log_file)
- failed = str(node.account.ssh_client.exec_command(cmd)[1].read(), sys.getdefaultencoding())
+ failed = IgniteApplicationService.exec_command(node, cmd)
if int(failed) > 0:
raise AssertionError(
"Wrong node failed (segmented) on '%s'. Check the logs." % node.name)
-def start_zookeeper(test_context, num_nodes, test_config):
+def start_zookeeper(test_context, num_nodes, failure_detection_timeout):
"""
Start zookeeper cluster.
"""
- zk_settings = ZookeeperSettings(min_session_timeout=test_config.failure_detection_timeout)
+ zk_settings = ZookeeperSettings(min_session_timeout=failure_detection_timeout)
zk_quorum = ZookeeperService(test_context, num_nodes, settings=zk_settings)
zk_quorum.start()
@@ -304,8 +308,7 @@ def start_servers(test_context, num_nodes, ignite_config, modules=None):
"""
servers = IgniteService(test_context, config=ignite_config, num_nodes=num_nodes, modules=modules,
# mute spam in log.
- jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"],
- startup_timeout_sec=100)
+ jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"])
start = monotonic()
servers.start()
@@ -343,13 +346,3 @@ def choose_node_to_kill(servers, nodes_to_kill, sequential):
assert len(to_kill) == nodes_to_kill, "Unable to pick up required number of nodes to kill."
return to_kill
-
-
-def order(node):
- """Return discovery order of the node."""
- return node.discovery_info().order
-
-
-def node_id(node):
- """Return node id."""
- return node.discovery_info().node_id
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index 677141c..5c9fcad 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -55,3 +55,11 @@ class IgniteTest(Test):
self.logger.debug("All services killed.")
super().tearDown()
+
+ def _global_param(self, param_name, default=None):
+ """Reads global parameter passed to the test suite."""
+ return self.test_context.globals.get(param_name, default)
+
+ def _global_int(self, param_name, default: int = None):
+ """Reads global parameter passed to the test suite and converts to int."""
+ return int(self._global_param(param_name, default))