You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/08/12 14:56:04 UTC
[ignite] branch ignite-ducktape updated: Fix measuring timers in
discovery tests (#8142)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new 2e4b4ad Fix measuring timers in discovery tests (#8142)
2e4b4ad is described below
commit 2e4b4addab42b02b78b03787a25795255ec5312e
Author: Vladsz83 <vl...@gmail.com>
AuthorDate: Wed Aug 12 17:55:39 2020 +0300
Fix measuring timers in discovery tests (#8142)
---
.../ducktests/tests/ignitetest/services/ignite.py | 64 ++++++-
.../tests/ignitetest/services/utils/concurrent.py | 90 ++++++++++
.../ignitetest/services/utils/ignite_aware.py | 2 +-
.../tests/ignitetest/services/utils/time_utils.py | 28 +++
.../tests/ignitetest/tests/discovery_test.py | 190 +++++++++++++++------
5 files changed, 318 insertions(+), 56 deletions(-)
diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py b/modules/ducktests/tests/ignitetest/services/ignite.py
index 4e2ea96..1c5a476 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -17,12 +17,19 @@
This module contains class to start ignite cluster node.
"""
-import os.path
+import functools
+import operator
+import os
import signal
+import time
+from datetime import datetime
+from threading import Thread
+import monotonic
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until
+from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue
from ignitetest.services.utils.ignite_aware import IgniteAwareService
from ignitetest.tests.utils.version import DEV_BRANCH
@@ -91,7 +98,7 @@ class IgniteService(IgniteAwareService):
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
- node.account.signal(pid, sig, allow_fail=False)
+ self.__stop_node(node, pid, sig)
try:
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=timeout_sec,
@@ -100,6 +107,59 @@ class IgniteService(IgniteAwareService):
self.thread_dump(node)
raise
+ def stop_nodes_async(self, nodes, delay_ms=0, clean_shutdown=True, timeout_sec=20, wait_for_stop=False):
+ """
+ Stops the nodes asynchronously.
+ """
+ sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+ sem = CountDownLatch(len(nodes))
+ time_holder = AtomicValue()
+
+ delay = 0
+ threads = []
+
+ for node in nodes:
+ thread = Thread(target=self.__stop_node,
+ args=(node, next(iter(self.pids(node))), sig, sem, delay, time_holder))
+
+ threads.append(thread)
+
+ thread.start()
+
+ delay += delay_ms
+
+ for thread in threads:
+ thread.join(timeout_sec)
+
+ if wait_for_stop:
+ try:
+ wait_until(lambda: len(functools.reduce(operator.iconcat, (self.pids(n) for n in nodes), [])) == 0,
+ timeout_sec=timeout_sec, err_msg="Ignite node failed to stop in %d seconds" % timeout_sec)
+ except Exception:
+ for node in nodes:
+ self.thread_dump(node)
+ raise
+
+ return time_holder.get()
+
+ @staticmethod
+ def __stop_node(node, pid, sig, start_waiter=None, delay_ms=0, time_holder=None):
+ if start_waiter:
+ start_waiter.count_down()
+ start_waiter.wait()
+
+ if delay_ms > 0:
+ time.sleep(delay_ms/1000.0)
+
+ if time_holder:
+ mono = monotonic.monotonic()
+ timestamp = datetime.now()
+
+ time_holder.compare_and_set(None, (mono, timestamp))
+
+ node.account.signal(pid, sig, False)
+
def clean_node(self, node):
node.account.kill_java_processes(self.APP_SERVICE_CLASS, clean_shutdown=False, allow_fail=True)
node.account.ssh("sudo rm -rf -- %s" % IgniteService.PERSISTENT_ROOT, allow_fail=False)
diff --git a/modules/ducktests/tests/ignitetest/services/utils/concurrent.py b/modules/ducktests/tests/ignitetest/services/utils/concurrent.py
new file mode 100644
index 0000000..57768bc
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/concurrent.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains concurrent utils.
+"""
+
+import threading
+
+
+class CountDownLatch(object):
+ """
+ A count-down latch.
+ """
+ def __init__(self, count=1):
+ self.count = count
+ self.cond_var = threading.Condition()
+
+ def count_down(self):
+ """
+ Decreases the latch counter.
+ """
+ with self.cond_var:
+ if self.count > 0:
+ self.count -= 1
+ if self.count == 0:
+ self.cond_var.notifyAll()
+
+ def wait(self):
+ """
+ Blocks current thread if the latch is not free.
+ """
+ with self.cond_var:
+ while self.count > 0:
+ self.cond_var.wait()
+
+
+class AtomicValue:
+ """
+ An atomic reference holder.
+ """
+ def __init__(self, value=None):
+ self.value = value
+ self.lock = threading.Lock()
+
+ def set(self, value):
+ """
+ Sets new value to hold.
+ :param value: New value to hold.
+ """
+ with self.lock:
+ self.value = value
+
+ def get(self):
+ """
+ Gives current value.
+ """
+ with self.lock:
+ return self.value
+
+ def compare_and_set(self, expected, value):
+ """
+ Sets new value to hold if current one equals expected.
+ :param expected: The value to compare with.
+ :param value: New value to hold.
+ """
+ return self.check_and_set(lambda: self.value == expected, value)
+
+ def check_and_set(self, condition, value):
+ """
+ Sets new value to hold by condition.
+ :param condition: The condition to check.
+ :param value: New value to hold.
+ """
+ with self.lock:
+ if condition():
+ self.value = value
+ return self.value
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 7830082..0b5e18f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -148,7 +148,7 @@ class IgniteAwareService(BackgroundThreadService):
:param evt_message: Event message.
:param node: Ignite service node.
:param timeout_sec: Number of seconds to check the condition for before failing.
- :param from_the_beginning: If True, search for message from the beggining of log file.
+ :param from_the_beginning: If True, search for message from the beginning of log file.
:param backoff_sec: Number of seconds to back off between each failure to meet the condition
before checking again.
"""
diff --git a/modules/ducktests/tests/ignitetest/services/utils/time_utils.py b/modules/ducktests/tests/ignitetest/services/utils/time_utils.py
new file mode 100644
index 0000000..01926bf
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/time_utils.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains time utils.
+"""
+
+import time
+
+
+def epoch_mills(date_time):
+ """
+ :param date_time: a datetime.
+ :return: milliseconds since epoch of passed datetime.
+ """
+ return int(round((time.mktime(date_time.timetuple()) + date_time.microsecond / 1e6) * 1000))
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index 01f6bcb..8d93396 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -18,12 +18,16 @@ Module contains discovery tests.
"""
import random
+import re
+from datetime import datetime
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from jinja2 import Template
from ignitetest.services.ignite import IgniteService
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
+from ignitetest.services.utils.time_utils import epoch_mills
from ignitetest.services.zk.zookeeper import ZookeeperService
from ignitetest.tests.utils.ignite_test import IgniteTest
from ignitetest.tests.utils.version import DEV_BRANCH, LATEST_2_7
@@ -32,26 +36,27 @@ from ignitetest.tests.utils.version import DEV_BRANCH, LATEST_2_7
# pylint: disable=W0223
class DiscoveryTest(IgniteTest):
"""
- Test basic discovery scenarious (TCP and Zookeeper).
+ Test various node failure scenarios (TCP and ZooKeeper).
1. Start of ignite cluster.
2. Kill random node.
3. Wait that survived node detects node failure.
"""
NUM_NODES = 7
+ FAILURE_DETECTION_TIMEOUT = 2000
+
CONFIG_TEMPLATE = """
- {% if zookeeper_settings %}
+ <property name="failureDetectionTimeout" value="{{ failure_detection_timeout }}"/>
+ {% if zookeeper_settings %}
{% with zk = zookeeper_settings %}
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi">
<property name="zkConnectionString" value="{{ zk.connection_string }}"/>
- <property name="sessionTimeout" value="{{ zk.session_timeout or 3000 }}"/>
<property name="zkRootPath" value="{{ zk.root_path or '/apacheIgnite' }}"/>
- <property name="joinTimeout" value="{{ zk.join_timeout or 10000 }}"/>
</bean>
</property>
{% endwith %}
- {% endif %}
+ {% endif %}
"""
def __init__(self, test_context):
@@ -59,14 +64,28 @@ class DiscoveryTest(IgniteTest):
self.zk_quorum = None
self.servers = None
+ def __start_zk_quorum(self):
+ self.zk_quorum = ZookeeperService(self.test_context, 3)
+
+ self.stage("Starting ZooKeeper quorum")
+
+ self.zk_quorum.start()
+
+ self.stage("ZooKeeper quorum started")
+
@staticmethod
- def properties(zookeeper_settings=None):
+ def __properties(zookeeper_settings=None):
"""
- :param zookeeper_settings: ZookeperDiscoverySpi settings. If None, TcpDiscoverySpi will be used.
+ :param zookeeper_settings: ZooKeeperDiscoverySpi settings. If None, TcpDiscoverySpi will be used.
:return: Rendered node's properties.
"""
return Template(DiscoveryTest.CONFIG_TEMPLATE) \
- .render(zookeeper_settings=zookeeper_settings)
+ .render(failure_detection_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT,
+ zookeeper_settings=zookeeper_settings)
+
+ @staticmethod
+ def __zk_properties(connection_string):
+ return DiscoveryTest.__properties(zookeeper_settings={'connection_string': connection_string})
def setUp(self):
pass
@@ -81,31 +100,68 @@ class DiscoveryTest(IgniteTest):
@cluster(num_nodes=NUM_NODES)
@parametrize(version=str(DEV_BRANCH))
@parametrize(version=str(LATEST_2_7))
- def test_tcp(self, version):
+ def test_tcp_not_coordinator_single(self, version):
+ """
+ Test single-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
+ """
+ return self.__simulate_nodes_failure(version, self.__properties(), 1)
+
+ @cluster(num_nodes=NUM_NODES)
+ @parametrize(version=str(DEV_BRANCH))
+ @parametrize(version=str(LATEST_2_7))
+ def test_tcp_not_coordinator_two(self, version):
"""
- Test basic discovery scenario with TcpDiscoverySpi.
+ Test two-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
"""
- return self.__basic_test__(version, False)
+ return self.__simulate_nodes_failure(version, self.__properties(), 2)
+
+ @cluster(num_nodes=NUM_NODES)
+ @parametrize(version=str(DEV_BRANCH))
+ @parametrize(version=str(LATEST_2_7))
+ def test_tcp_coordinator(self, version):
+ """
+ Test coordinator-failure scenario with TcpDiscoverySpi.
+ """
+ return self.__simulate_nodes_failure(version, self.__properties(), 0)
@cluster(num_nodes=NUM_NODES + 3)
@parametrize(version=str(DEV_BRANCH))
@parametrize(version=str(LATEST_2_7))
- def test_zk(self, version):
+ def test_zk_not_coordinator_single(self, version):
"""
- Test basic discovery scenario with ZookeeperDiscoverySpi.
+ Test single node failure scenario (not the coordinator) with ZooKeeper.
"""
- return self.__basic_test__(version, True)
+ self.__start_zk_quorum()
- def __basic_test__(self, version, with_zk=False):
- if with_zk:
- self.zk_quorum = ZookeeperService(self.test_context, 3)
- self.stage("Starting Zookeper quorum")
- self.zk_quorum.start()
- properties = self.properties(zookeeper_settings={'connection_string': self.zk_quorum.connection_string()})
- self.stage("Zookeper quorum started")
- else:
- properties = self.properties()
+ return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 1)
+
+ @cluster(num_nodes=NUM_NODES + 3)
+ @parametrize(version=str(DEV_BRANCH))
+ @parametrize(version=str(LATEST_2_7))
+ def test_zk_not_coordinator_two(self, version):
+ """
+ Test two-node-failure scenario (not the coordinator) with ZooKeeper.
+ """
+ self.__start_zk_quorum()
+
+ return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 2)
+
+ @cluster(num_nodes=NUM_NODES+3)
+ @parametrize(version=str(DEV_BRANCH))
+ @parametrize(version=str(LATEST_2_7))
+ def test_zk_coordinator(self, version):
+ """
+ Test coordinator-failure scenario with ZooKeeper.
+ """
+ self.__start_zk_quorum()
+ return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 0)
+
+ def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
+ """
+ :param nodes_to_kill: How many nodes to kill. If <1, the coordinator is the choice. Otherwise: not-coordinator
+ nodes of given number.
+ """
self.servers = IgniteService(
self.test_context,
num_nodes=self.NUM_NODES,
@@ -115,48 +171,76 @@ class DiscoveryTest(IgniteTest):
self.stage("Starting ignite cluster")
- start = self.monotonic()
+ time_holder = self.monotonic()
+
self.servers.start()
- data = {'Ignite cluster start time (s)': self.monotonic() - start}
+
+ if nodes_to_kill > self.servers.num_nodes - 1:
+ raise Exception("Too many nodes to kill: " + str(nodes_to_kill))
+
+ data = {'Ignite cluster start time (s)': round(self.monotonic() - time_holder, 1)}
self.stage("Topology is ready")
- # Node failure detection
- fail_node, survived_node = self.choose_random_node_to_kill(self.servers)
+ failed_nodes, survived_node = self.__choose_node_to_kill(nodes_to_kill)
- data["nodes"] = [node.node_id() for node in self.servers.nodes]
+ ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
- disco_infos = []
- for node in self.servers.nodes:
- disco_info = node.discovery_info()
- disco_infos.append({
- "id": disco_info.node_id,
- "consistent_id": disco_info.consistent_id,
- "coordinator": disco_info.coordinator,
- "order": disco_info.order,
- "int_order": disco_info.int_order,
- "is_client": disco_info.is_client
- })
+ self.stage("Stopping " + str(len(failed_nodes)) + " nodes.")
- data["node_disco_info"] = disco_infos
+ first_terminated = self.servers.stop_nodes_async(failed_nodes, clean_shutdown=False, wait_for_stop=False)
- self.servers.stop_node(fail_node, clean_shutdown=False)
+ self.stage("Waiting for failure detection of " + str(len(failed_nodes)) + " nodes.")
- start = self.monotonic()
- self.servers.await_event_on_node("Node FAILED", random.choice(survived_node), 60, True)
+ # Keeps dates of logged node failures.
+ logged_timestamps = []
- data['Failure of node detected in time (s)'] = self.monotonic() - start
+ for failed_id in ids_to_wait:
+ self.servers.await_event_on_node(self.__failed_pattern(failed_id), survived_node, 10,
+ from_the_beginning=True, backoff_sec=0.01)
+ # Save mono of last detected failure.
+ time_holder = self.monotonic()
+ self.stage("Failure detection measured.")
+
+ for failed_id in ids_to_wait:
+ _, stdout, _ = survived_node.account.ssh_client.exec_command(
+ "grep '%s' %s" % (self.__failed_pattern(failed_id), IgniteAwareService.STDOUT_STDERR_CAPTURE))
+
+ logged_timestamps.append(
+ datetime.strptime(re.match("^\\[[^\\[]+\\]", stdout.read()).group(), "[%Y-%m-%d %H:%M:%S,%f]"))
+
+ logged_timestamps.sort(reverse=True)
+
+ # Failure detection delay.
+ time_holder = int((time_holder - first_terminated[0]) * 1000)
+ # Failure detection delay by log.
+ by_log = epoch_mills(logged_timestamps[0]) - epoch_mills(first_terminated[1])
+
+ assert by_log > 0, "Negative node failure detection delay: " + by_log + ". Probably it is a timezone issue."
+ assert by_log <= time_holder, "Value of node failure detection delay taken from by the node log (" + \
+ str(by_log) + "ms) must be lesser than measured value (" + str(time_holder) + \
+ "ms) because watching this event consumes extra time."
+
+ data['Detection of node(s) failure, measured (ms)'] = time_holder
+ data['Detection of node(s) failure, by the log (ms)'] = by_log
+ data['Nodes failed'] = len(failed_nodes)
return data
@staticmethod
- def choose_random_node_to_kill(service):
- """
- :param service: Service nodes to process.
- :return: Tuple of random node to kill and survived nodes
- """
- idx = random.randint(0, len(service.nodes) - 1)
+ def __failed_pattern(failed_node_id):
+ return "Node FAILED: .\\{1,\\}Node \\[id=" + failed_node_id
+
+ def __choose_node_to_kill(self, nodes_to_kill):
+ nodes = self.servers.nodes
+ coordinator = nodes[0].discovery_info().coordinator
+
+ if nodes_to_kill < 1:
+ to_kill = next(node for node in nodes if node.discovery_info().node_id == coordinator)
+ else:
+ to_kill = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+
+ to_kill = [to_kill] if not isinstance(to_kill, list) else to_kill
- survive = [node for i, node in enumerate(service.nodes) if i != idx]
- kill = service.nodes[idx]
+ survive = random.choice([node for node in self.servers.nodes if node not in to_kill])
- return kill, survive
+ return to_kill, survive