You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/08/12 14:56:04 UTC

[ignite] branch ignite-ducktape updated: Fix measuring timers in discovery tests (#8142)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 2e4b4ad  Fix measuring timers in discovery tests (#8142)
2e4b4ad is described below

commit 2e4b4addab42b02b78b03787a25795255ec5312e
Author: Vladsz83 <vl...@gmail.com>
AuthorDate: Wed Aug 12 17:55:39 2020 +0300

    Fix measuring timers in discovery tests (#8142)
---
 .../ducktests/tests/ignitetest/services/ignite.py  |  64 ++++++-
 .../tests/ignitetest/services/utils/concurrent.py  |  90 ++++++++++
 .../ignitetest/services/utils/ignite_aware.py      |   2 +-
 .../tests/ignitetest/services/utils/time_utils.py  |  28 +++
 .../tests/ignitetest/tests/discovery_test.py       | 190 +++++++++++++++------
 5 files changed, 318 insertions(+), 56 deletions(-)

diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py b/modules/ducktests/tests/ignitetest/services/ignite.py
index 4e2ea96..1c5a476 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -17,12 +17,19 @@
 This module contains class to start ignite cluster node.
 """
 
-import os.path
+import functools
+import operator
+import os
 import signal
+import time
+from datetime import datetime
+from threading import Thread
 
+import monotonic
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.utils.util import wait_until
 
+from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue
 from ignitetest.services.utils.ignite_aware import IgniteAwareService
 from ignitetest.tests.utils.version import DEV_BRANCH
 
@@ -91,7 +98,7 @@ class IgniteService(IgniteAwareService):
         sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
 
         for pid in pids:
-            node.account.signal(pid, sig, allow_fail=False)
+            self.__stop_node(node, pid, sig)
 
         try:
             wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=timeout_sec,
@@ -100,6 +107,59 @@ class IgniteService(IgniteAwareService):
             self.thread_dump(node)
             raise
 
+    def stop_nodes_async(self, nodes, delay_ms=0, clean_shutdown=True, timeout_sec=20, wait_for_stop=False):
+        """
+        Stops the nodes asynchronously.
+        """
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        sem = CountDownLatch(len(nodes))
+        time_holder = AtomicValue()
+
+        delay = 0
+        threads = []
+
+        for node in nodes:
+            thread = Thread(target=self.__stop_node,
+                            args=(node, next(iter(self.pids(node))), sig, sem, delay, time_holder))
+
+            threads.append(thread)
+
+            thread.start()
+
+            delay += delay_ms
+
+        for thread in threads:
+            thread.join(timeout_sec)
+
+        if wait_for_stop:
+            try:
+                wait_until(lambda: len(functools.reduce(operator.iconcat, (self.pids(n) for n in nodes), [])) == 0,
+                           timeout_sec=timeout_sec, err_msg="Ignite node failed to stop in %d seconds" % timeout_sec)
+            except Exception:
+                for node in nodes:
+                    self.thread_dump(node)
+                raise
+
+        return time_holder.get()
+
+    @staticmethod
+    def __stop_node(node, pid, sig, start_waiter=None, delay_ms=0, time_holder=None):
+        if start_waiter:
+            start_waiter.count_down()
+            start_waiter.wait()
+
+        if delay_ms > 0:
+            time.sleep(delay_ms/1000.0)
+
+        if time_holder:
+            mono = monotonic.monotonic()
+            timestamp = datetime.now()
+
+            time_holder.compare_and_set(None, (mono, timestamp))
+
+        node.account.signal(pid, sig, False)
+
     def clean_node(self, node):
         node.account.kill_java_processes(self.APP_SERVICE_CLASS, clean_shutdown=False, allow_fail=True)
         node.account.ssh("sudo rm -rf -- %s" % IgniteService.PERSISTENT_ROOT, allow_fail=False)
diff --git a/modules/ducktests/tests/ignitetest/services/utils/concurrent.py b/modules/ducktests/tests/ignitetest/services/utils/concurrent.py
new file mode 100644
index 0000000..57768bc
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/concurrent.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains concurrent utils.
+"""
+
+import threading
+
+
+class CountDownLatch(object):
+    """
+    A count-down latch.
+    """
+    def __init__(self, count=1):
+        self.count = count
+        self.cond_var = threading.Condition()
+
+    def count_down(self):
+        """
+        Decreases the latch counter.
+        """
+        with self.cond_var:
+            if self.count > 0:
+                self.count -= 1
+            if self.count == 0:
+                self.cond_var.notifyAll()
+
+    def wait(self):
+        """
+        Blocks current thread if the latch is not free.
+        """
+        with self.cond_var:
+            while self.count > 0:
+                self.cond_var.wait()
+
+
+class AtomicValue:
+    """
+    An atomic reference holder.
+    """
+    def __init__(self, value=None):
+        self.value = value
+        self.lock = threading.Lock()
+
+    def set(self, value):
+        """
+        Sets new value to hold.
+        :param value: New value to hold.
+        """
+        with self.lock:
+            self.value = value
+
+    def get(self):
+        """
+        Gives current value.
+        """
+        with self.lock:
+            return self.value
+
+    def compare_and_set(self, expected, value):
+        """
+        Sets new value to hold if current one equals expected.
+        :param expected: The value to compare with.
+        :param value: New value to hold.
+        """
+        return self.check_and_set(lambda: self.value == expected, value)
+
+    def check_and_set(self, condition, value):
+        """
+        Sets new value to hold by condition.
+        :param condition: The condition to check.
+        :param value: New value to hold.
+        """
+        with self.lock:
+            if condition():
+                self.value = value
+            return self.value
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 7830082..0b5e18f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -148,7 +148,7 @@ class IgniteAwareService(BackgroundThreadService):
         :param evt_message: Event message.
         :param node: Ignite service node.
         :param timeout_sec: Number of seconds to check the condition for before failing.
-        :param from_the_beginning: If True, search for message from the beggining of log file.
+        :param from_the_beginning: If True, search for message from the beginning of log file.
         :param backoff_sec: Number of seconds to back off between each failure to meet the condition
                 before checking again.
         """
diff --git a/modules/ducktests/tests/ignitetest/services/utils/time_utils.py b/modules/ducktests/tests/ignitetest/services/utils/time_utils.py
new file mode 100644
index 0000000..01926bf
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/time_utils.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains time utils.
+"""
+
+import time
+
+
+def epoch_mills(date_time):
+    """
+    :param date_time: a datetime.
+    :return: milliseconds since epoch of passed datetime.
+    """
+    return int(round((time.mktime(date_time.timetuple()) + date_time.microsecond / 1e6) * 1000))
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index 01f6bcb..8d93396 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -18,12 +18,16 @@ Module contains discovery tests.
 """
 
 import random
+import re
+from datetime import datetime
 
 from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 from jinja2 import Template
 
 from ignitetest.services.ignite import IgniteService
+from ignitetest.services.utils.ignite_aware import IgniteAwareService
+from ignitetest.services.utils.time_utils import epoch_mills
 from ignitetest.services.zk.zookeeper import ZookeeperService
 from ignitetest.tests.utils.ignite_test import IgniteTest
 from ignitetest.tests.utils.version import DEV_BRANCH, LATEST_2_7
@@ -32,26 +36,27 @@ from ignitetest.tests.utils.version import DEV_BRANCH, LATEST_2_7
 # pylint: disable=W0223
 class DiscoveryTest(IgniteTest):
     """
-    Test basic discovery scenarious (TCP and Zookeeper).
+    Test various node failure scenarios (TCP and ZooKeeper).
     1. Start of ignite cluster.
     2. Kill random node.
     3. Wait that survived node detects node failure.
     """
     NUM_NODES = 7
 
+    FAILURE_DETECTION_TIMEOUT = 2000
+
     CONFIG_TEMPLATE = """
-        {% if zookeeper_settings %}
+    <property name="failureDetectionTimeout" value="{{ failure_detection_timeout }}"/>
+    {% if zookeeper_settings %}
         {% with zk = zookeeper_settings %}
         <property name="discoverySpi">
             <bean class="org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi">
                 <property name="zkConnectionString" value="{{ zk.connection_string }}"/>
-                <property name="sessionTimeout" value="{{ zk.session_timeout or 3000 }}"/>
                 <property name="zkRootPath" value="{{ zk.root_path or '/apacheIgnite' }}"/>
-                <property name="joinTimeout" value="{{ zk.join_timeout or 10000 }}"/>
             </bean>
         </property>
         {% endwith %}
-        {% endif %}
+    {% endif %}
     """
 
     def __init__(self, test_context):
@@ -59,14 +64,28 @@ class DiscoveryTest(IgniteTest):
         self.zk_quorum = None
         self.servers = None
 
+    def __start_zk_quorum(self):
+        self.zk_quorum = ZookeeperService(self.test_context, 3)
+
+        self.stage("Starting ZooKeeper quorum")
+
+        self.zk_quorum.start()
+
+        self.stage("ZooKeeper quorum started")
+
     @staticmethod
-    def properties(zookeeper_settings=None):
+    def __properties(zookeeper_settings=None):
         """
-        :param zookeeper_settings: ZookeperDiscoverySpi settings. If None, TcpDiscoverySpi will be used.
+        :param zookeeper_settings: ZooKeeperDiscoverySpi settings. If None, TcpDiscoverySpi will be used.
         :return: Rendered node's properties.
         """
         return Template(DiscoveryTest.CONFIG_TEMPLATE) \
-            .render(zookeeper_settings=zookeeper_settings)
+            .render(failure_detection_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT,
+                    zookeeper_settings=zookeeper_settings)
+
+    @staticmethod
+    def __zk_properties(connection_string):
+        return DiscoveryTest.__properties(zookeeper_settings={'connection_string': connection_string})
 
     def setUp(self):
         pass
@@ -81,31 +100,68 @@ class DiscoveryTest(IgniteTest):
     @cluster(num_nodes=NUM_NODES)
     @parametrize(version=str(DEV_BRANCH))
     @parametrize(version=str(LATEST_2_7))
-    def test_tcp(self, version):
+    def test_tcp_not_coordinator_single(self, version):
+        """
+        Test single-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
+        """
+        return self.__simulate_nodes_failure(version, self.__properties(), 1)
+
+    @cluster(num_nodes=NUM_NODES)
+    @parametrize(version=str(DEV_BRANCH))
+    @parametrize(version=str(LATEST_2_7))
+    def test_tcp_not_coordinator_two(self, version):
         """
-        Test basic discovery scenario with TcpDiscoverySpi.
+        Test two-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
         """
-        return self.__basic_test__(version, False)
+        return self.__simulate_nodes_failure(version, self.__properties(), 2)
+
+    @cluster(num_nodes=NUM_NODES)
+    @parametrize(version=str(DEV_BRANCH))
+    @parametrize(version=str(LATEST_2_7))
+    def test_tcp_coordinator(self, version):
+        """
+        Test coordinator-failure scenario with TcpDiscoverySpi.
+        """
+        return self.__simulate_nodes_failure(version, self.__properties(), 0)
 
     @cluster(num_nodes=NUM_NODES + 3)
     @parametrize(version=str(DEV_BRANCH))
     @parametrize(version=str(LATEST_2_7))
-    def test_zk(self, version):
+    def test_zk_not_coordinator_single(self, version):
         """
-        Test basic discovery scenario with ZookeeperDiscoverySpi.
+        Test single node failure scenario (not the coordinator) with ZooKeeper.
         """
-        return self.__basic_test__(version, True)
+        self.__start_zk_quorum()
 
-    def __basic_test__(self, version, with_zk=False):
-        if with_zk:
-            self.zk_quorum = ZookeeperService(self.test_context, 3)
-            self.stage("Starting Zookeper quorum")
-            self.zk_quorum.start()
-            properties = self.properties(zookeeper_settings={'connection_string': self.zk_quorum.connection_string()})
-            self.stage("Zookeper quorum started")
-        else:
-            properties = self.properties()
+        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 1)
+
+    @cluster(num_nodes=NUM_NODES + 3)
+    @parametrize(version=str(DEV_BRANCH))
+    @parametrize(version=str(LATEST_2_7))
+    def test_zk_not_coordinator_two(self, version):
+        """
+        Test two-node-failure scenario (not the coordinator) with ZooKeeper.
+        """
+        self.__start_zk_quorum()
+
+        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 2)
+
+    @cluster(num_nodes=NUM_NODES+3)
+    @parametrize(version=str(DEV_BRANCH))
+    @parametrize(version=str(LATEST_2_7))
+    def test_zk_coordinator(self, version):
+        """
+        Test coordinator-failure scenario with ZooKeeper.
+        """
+        self.__start_zk_quorum()
 
+        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 0)
+
+    def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
+        """
+        :param nodes_to_kill: How many nodes to kill. If <1, the coordinator is the choice. Otherwise: not-coordinator
+        nodes of given number.
+        """
         self.servers = IgniteService(
             self.test_context,
             num_nodes=self.NUM_NODES,
@@ -115,48 +171,76 @@ class DiscoveryTest(IgniteTest):
 
         self.stage("Starting ignite cluster")
 
-        start = self.monotonic()
+        time_holder = self.monotonic()
+
         self.servers.start()
-        data = {'Ignite cluster start time (s)': self.monotonic() - start}
+
+        if nodes_to_kill > self.servers.num_nodes - 1:
+            raise Exception("Too many nodes to kill: " + str(nodes_to_kill))
+
+        data = {'Ignite cluster start time (s)': round(self.monotonic() - time_holder, 1)}
         self.stage("Topology is ready")
 
-        # Node failure detection
-        fail_node, survived_node = self.choose_random_node_to_kill(self.servers)
+        failed_nodes, survived_node = self.__choose_node_to_kill(nodes_to_kill)
 
-        data["nodes"] = [node.node_id() for node in self.servers.nodes]
+        ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
 
-        disco_infos = []
-        for node in self.servers.nodes:
-            disco_info = node.discovery_info()
-            disco_infos.append({
-                "id": disco_info.node_id,
-                "consistent_id": disco_info.consistent_id,
-                "coordinator": disco_info.coordinator,
-                "order": disco_info.order,
-                "int_order": disco_info.int_order,
-                "is_client": disco_info.is_client
-            })
+        self.stage("Stopping " + str(len(failed_nodes)) + " nodes.")
 
-        data["node_disco_info"] = disco_infos
+        first_terminated = self.servers.stop_nodes_async(failed_nodes, clean_shutdown=False, wait_for_stop=False)
 
-        self.servers.stop_node(fail_node, clean_shutdown=False)
+        self.stage("Waiting for failure detection of " + str(len(failed_nodes)) + " nodes.")
 
-        start = self.monotonic()
-        self.servers.await_event_on_node("Node FAILED", random.choice(survived_node), 60, True)
+        # Keeps dates of logged node failures.
+        logged_timestamps = []
 
-        data['Failure of node detected in time (s)'] = self.monotonic() - start
+        for failed_id in ids_to_wait:
+            self.servers.await_event_on_node(self.__failed_pattern(failed_id), survived_node, 10,
+                                             from_the_beginning=True, backoff_sec=0.01)
+            # Save mono of last detected failure.
+            time_holder = self.monotonic()
+            self.stage("Failure detection measured.")
+
+        for failed_id in ids_to_wait:
+            _, stdout, _ = survived_node.account.ssh_client.exec_command(
+                "grep '%s' %s" % (self.__failed_pattern(failed_id), IgniteAwareService.STDOUT_STDERR_CAPTURE))
+
+            logged_timestamps.append(
+                datetime.strptime(re.match("^\\[[^\\[]+\\]", stdout.read()).group(), "[%Y-%m-%d %H:%M:%S,%f]"))
+
+        logged_timestamps.sort(reverse=True)
+
+        # Failure detection delay.
+        time_holder = int((time_holder - first_terminated[0]) * 1000)
+        # Failure detection delay by log.
+        by_log = epoch_mills(logged_timestamps[0]) - epoch_mills(first_terminated[1])
+
+        assert by_log > 0, "Negative node failure detection delay: " + by_log + ". Probably it is a timezone issue."
+        assert by_log <= time_holder, "Value of node failure detection delay taken from by the node log (" + \
+                                      str(by_log) + "ms) must be lesser than measured value (" + str(time_holder) + \
+                                      "ms) because watching this event consumes extra time."
+
+        data['Detection of node(s) failure, measured (ms)'] = time_holder
+        data['Detection of node(s) failure, by the log (ms)'] = by_log
+        data['Nodes failed'] = len(failed_nodes)
 
         return data
 
     @staticmethod
-    def choose_random_node_to_kill(service):
-        """
-        :param service: Service nodes to process.
-        :return: Tuple of random node to kill and survived nodes
-        """
-        idx = random.randint(0, len(service.nodes) - 1)
+    def __failed_pattern(failed_node_id):
+        return "Node FAILED: .\\{1,\\}Node \\[id=" + failed_node_id
+
+    def __choose_node_to_kill(self, nodes_to_kill):
+        nodes = self.servers.nodes
+        coordinator = nodes[0].discovery_info().coordinator
+
+        if nodes_to_kill < 1:
+            to_kill = next(node for node in nodes if node.discovery_info().node_id == coordinator)
+        else:
+            to_kill = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+
+        to_kill = [to_kill] if not isinstance(to_kill, list) else to_kill
 
-        survive = [node for i, node in enumerate(service.nodes) if i != idx]
-        kill = service.nodes[idx]
+        survive = random.choice([node for node in self.servers.nodes if node not in to_kill])
 
-        return kill, survive
+        return to_kill, survive