You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/08/26 09:37:26 UTC

[ignite] branch ignite-ducktape updated: Loading in discovery tests. (#8159)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new c6f5504  Loading in discovery tests. (#8159)
c6f5504 is described below

commit c6f55041ad4033d94cda759995c4245192d8bad4
Author: Vladsz83 <vl...@gmail.com>
AuthorDate: Wed Aug 26 12:36:58 2020 +0300

    Loading in discovery tests. (#8159)
---
 .../tests/ContinuousDataLoadApplication.java       |  74 +++++++
 .../ducktest/utils/IgniteAwareApplication.java     |  25 ++-
 .../tests/ignitetest/services/utils/ignite_spec.py |   1 +
 .../tests/ignitetest/tests/discovery_test.py       | 225 ++++++++++-----------
 4 files changed, 203 insertions(+), 122 deletions(-)

diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java
new file mode 100644
index 0000000..46a4b76
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests;
+
+import java.util.Random;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Keeps data load until stopped.
+ */
+public class ContinuousDataLoadApplication extends IgniteAwareApplication {
+    /** Logger. */
+    private static final Logger log = LogManager.getLogger(ContinuousDataLoadApplication.class.getName());
+
+    /** {@inheritDoc} */
+    @Override protected void run(JsonNode jsonNode) {
+        String cacheName = jsonNode.get("cacheName").asText();
+        int range = jsonNode.get("range").asInt();
+
+        IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName);
+
+        int warmUpCnt = (int)Math.max(1, 0.1f * range);
+
+        Random rnd = new Random();
+
+        long streamed = 0;
+
+        log.info("Generating data in background...");
+
+        long notifyTime = System.nanoTime();
+
+        while (active()) {
+            cache.put(rnd.nextInt(range), rnd.nextInt(range));
+
+            streamed++;
+
+            if (notifyTime + U.millisToNanos(1500) < System.nanoTime()) {
+                notifyTime = System.nanoTime();
+
+                if (log.isDebugEnabled())
+                    log.debug("Streamed " + streamed + " entries.");
+            }
+
+            // Delayed notify of the initialization to make sure the data load has completelly began and
+            // has produced some valuable amount of data.
+            if (!inited() && warmUpCnt == streamed)
+                markInitialized();
+        }
+
+        log.info("Background data generation finished.");
+
+        markFinished();
+    }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
index 5e610f1..107787b 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
@@ -76,8 +76,11 @@ public abstract class IgniteAwareApplication {
             else
                 log.info("Application already done [finished=" + finished + ", broken=" + broken + "]");
 
+            if (log.isDebugEnabled())
+                log.debug("Waiting for graceful termination...");
+
             while (!finished && !broken) {
-                log.info("Waiting for graceful termnation.");
+                log.info("Waiting for graceful termination cycle...");
 
                 try {
                     U.sleep(100);
@@ -86,6 +89,9 @@ public abstract class IgniteAwareApplication {
                     e.printStackTrace();
                 }
             }
+
+            if (log.isDebugEnabled())
+                log.debug("Graceful termination done.");
         }));
 
         log.info("ShutdownHook registered.");
@@ -111,7 +117,8 @@ public abstract class IgniteAwareApplication {
 
         log.info(APP_FINISHED);
 
-        removeShutdownHook();
+        if (!terminated())
+            removeShutdownHook();
 
         finished = true;
     }
@@ -166,6 +173,20 @@ public abstract class IgniteAwareApplication {
     }
 
     /**
+     *
+     */
+    protected boolean inited() {
+        return inited;
+    }
+
+    /**
+     *
+     */
+    protected boolean active() {
+        return !(terminated || broken || finished);
+    }
+
+    /**
      * @param name Name.
      * @param val Value.
      */
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index f992e3e..9cbfcc0 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -190,6 +190,7 @@ class ApacheIgniteApplicationSpec(IgniteApplicationSpec, IgnitePersistenceAware)
         self.jvm_opts.extend([
             "-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file ",
             "-Dlog4j.configDebug=true",
+            "-DIGNITE_NO_SHUTDOWN_HOOK=true",  # allows to perform operations on app termination.
             "-Xmx1G",
             "-ea",
             "-DIGNITE_ALLOW_ATOMIC_OPS_IN_TX=false"
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index 8aec38f..63f6ef8 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -21,16 +21,17 @@ import random
 import re
 from datetime import datetime
 
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from jinja2 import Template
 
+from ignitetest.services.ignite import IgniteAwareService
 from ignitetest.services.ignite import IgniteService
-from ignitetest.services.utils.ignite_aware import IgniteAwareService
+from ignitetest.services.ignite_app import IgniteApplicationService
 from ignitetest.services.utils.time_utils import epoch_mills
 from ignitetest.services.zk.zookeeper import ZookeeperService
 from ignitetest.utils.ignite_test import IgniteTest
-from ignitetest.utils.version import DEV_BRANCH, LATEST_2_7
+from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8
 
 
 # pylint: disable=W0223
@@ -41,10 +42,21 @@ class DiscoveryTest(IgniteTest):
     2. Kill random node.
     3. Wait that survived node detects node failure.
     """
+    class Config:
+        """
+        Configuration for DiscoveryTest.
+        """
+        def __init__(self, nodes_to_kill=1, kill_coordinator=False, with_load=False):
+            self.nodes_to_kill = nodes_to_kill
+            self.kill_coordinator = kill_coordinator
+            self.with_load = with_load
+
     NUM_NODES = 7
 
     FAILURE_DETECTION_TIMEOUT = 2000
 
+    DATA_AMOUNT = 100000
+
     CONFIG_TEMPLATE = """
     <property name="failureDetectionTimeout" value="{{ failure_detection_timeout }}"/>
     {% if zookeeper_settings %}
@@ -63,145 +75,85 @@ class DiscoveryTest(IgniteTest):
         super().__init__(test_context=test_context)
         self.zk_quorum = None
         self.servers = None
+        self.loader = None
 
-    def __start_zk_quorum(self):
-        self.zk_quorum = ZookeeperService(self.test_context, 3)
-
-        self.stage("Starting ZooKeeper quorum")
-
-        self.zk_quorum.start()
+    @cluster(num_nodes=NUM_NODES)
+    @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+            kill_coordinator=[False, True],
+            nodes_to_kill=[1, 2],
+            with_load=[False, True])
+    def test_tcp(self, ignite_version, kill_coordinator, nodes_to_kill, with_load):
+        """
+        Test nodes failure scenario with TcpDiscoverySpi.
+        """
+        config = DiscoveryTest.Config(nodes_to_kill, kill_coordinator, with_load)
 
-        self.stage("ZooKeeper quorum started")
+        return self.__simulate_nodes_failure(ignite_version, self.__properties(), None, config)
 
-    @staticmethod
-    def __properties(zookeeper_settings=None):
+    @cluster(num_nodes=NUM_NODES + 3)
+    @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+            kill_coordinator=[False, True],
+            nodes_to_kill=[1, 2],
+            with_load=[False, True])
+    def test_zk(self, ignite_version, kill_coordinator, nodes_to_kill, with_load):
         """
-        :param zookeeper_settings: ZooKeeperDiscoverySpi settings. If None, TcpDiscoverySpi will be used.
-        :return: Rendered node's properties.
+        Test node failure scenario with ZooKeeperSpi.
         """
-        return Template(DiscoveryTest.CONFIG_TEMPLATE) \
-            .render(failure_detection_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT,
-                    zookeeper_settings=zookeeper_settings)
+        config = DiscoveryTest.Config(nodes_to_kill, kill_coordinator, with_load)
 
-    @staticmethod
-    def __zk_properties(connection_string):
-        return DiscoveryTest.__properties(zookeeper_settings={'connection_string': connection_string})
+        self.__start_zk_quorum()
+
+        properties = self.__zk_properties(self.zk_quorum.connection_string())
+        modules = ["zookeeper"]
+
+        return self.__simulate_nodes_failure(ignite_version, properties, modules, config)
 
     def setUp(self):
         pass
 
     def teardown(self):
-        if self.zk_quorum:
-            self.zk_quorum.stop()
+        if self.loader:
+            self.loader.stop()
 
         if self.servers:
             self.servers.stop()
 
-    @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_not_coordinator_single(self, version):
-        """
-        Test single-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 1)
-
-    @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_not_coordinator_two(self, version):
-        """
-        Test two-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 2)
-
-    @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_coordinator(self, version):
-        """
-        Test coordinator-failure scenario with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 0)
-
-    @cluster(num_nodes=NUM_NODES + 3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_not_coordinator_single(self, version):
-        """
-        Test single node failure scenario (not the coordinator) with ZooKeeper.
-        """
-        self.__start_zk_quorum()
-
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 1)
-
-    @cluster(num_nodes=NUM_NODES + 3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_not_coordinator_two(self, version):
-        """
-        Test two-node-failure scenario (not the coordinator) with ZooKeeper.
-        """
-        self.__start_zk_quorum()
-
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 2)
-
-    @cluster(num_nodes=NUM_NODES+3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_coordinator(self, version):
-        """
-        Test coordinator-failure scenario with ZooKeeper.
-        """
-        self.__start_zk_quorum()
+        if self.zk_quorum:
+            self.zk_quorum.stop()
 
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 0)
+    def __simulate_nodes_failure(self, version, properties, modules, config):
+        if config.nodes_to_kill < 1:
+            return {"No nodes to kill": "Nothing to do"}
 
-    def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
-        """
-        :param nodes_to_kill: How many nodes to kill. If <1, the coordinator is the choice. Otherwise: not-coordinator
-        nodes of given number.
-        """
         self.servers = IgniteService(
             self.test_context,
-            num_nodes=self.NUM_NODES,
-            modules=["zookeeper"],
+            num_nodes=self.NUM_NODES - 1,
+            modules=modules,
             properties=properties,
             version=version)
 
-        self.stage("Starting ignite cluster")
-
         time_holder = self.monotonic()
 
         self.servers.start()
 
-        if nodes_to_kill > self.servers.num_nodes - 1:
-            raise Exception("Too many nodes to kill: " + str(nodes_to_kill))
-
         data = {'Ignite cluster start time (s)': round(self.monotonic() - time_holder, 1)}
-        self.stage("Topology is ready")
 
-        failed_nodes, survived_node = self.__choose_node_to_kill(nodes_to_kill)
+        failed_nodes, survived_node = self.__choose_node_to_kill(config.kill_coordinator, config.nodes_to_kill)
 
         ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
 
-        self.stage("Stopping " + str(len(failed_nodes)) + " nodes.")
+        if config.with_load:
+            self.__start_loading(version, properties, modules)
 
         first_terminated = self.servers.stop_nodes_async(failed_nodes, clean_shutdown=False, wait_for_stop=False)
 
-        self.stage("Waiting for failure detection of " + str(len(failed_nodes)) + " nodes.")
-
         # Keeps dates of logged node failures.
         logged_timestamps = []
 
         for failed_id in ids_to_wait:
-            self.servers.await_event_on_node(self.__failed_pattern(failed_id), survived_node, 10,
-                                             from_the_beginning=True, backoff_sec=0.01)
-            # Save mono of last detected failure.
-            time_holder = self.monotonic()
-            self.stage("Failure detection measured.")
+            self.servers.await_event_on_node(self.__failed_pattern(failed_id), survived_node, 20,
+                                             from_the_beginning=True, backoff_sec=0.1)
 
-        for failed_id in ids_to_wait:
             _, stdout, _ = survived_node.account.ssh_client.exec_command(
                 "grep '%s' %s" % (self.__failed_pattern(failed_id), IgniteAwareService.STDOUT_STDERR_CAPTURE))
 
@@ -211,37 +163,70 @@ class DiscoveryTest(IgniteTest):
 
         logged_timestamps.sort(reverse=True)
 
-        # Failure detection delay.
-        time_holder = int((time_holder - first_terminated[0]) * 1000)
-        # Failure detection delay by log.
-        by_log = epoch_mills(logged_timestamps[0]) - epoch_mills(first_terminated[1])
-
-        assert by_log > 0, "Negative node failure detection delay: " + by_log + ". Probably it is a timezone issue."
-        assert by_log <= time_holder, "Value of node failure detection delay taken from by the node log (" + \
-                                      str(by_log) + "ms) must be lesser than measured value (" + str(time_holder) + \
-                                      "ms) because watching this event consumes extra time."
+        self.__store_results(data, logged_timestamps, first_terminated[1])
 
-        data['Detection of node(s) failure, measured (ms)'] = time_holder
-        data['Detection of node(s) failure, by the log (ms)'] = by_log
         data['Nodes failed'] = len(failed_nodes)
 
         return data
 
     @staticmethod
+    def __store_results(data, logged_timestamps, first_kill_time):
+        first_kill_time = epoch_mills(first_kill_time)
+
+        detection_delay = epoch_mills(logged_timestamps[0]) - first_kill_time
+
+        data['Detection of node(s) failure (ms)'] = detection_delay
+        data['All detection delays (ms):'] = str([epoch_mills(ts) - first_kill_time for ts in logged_timestamps])
+
+    @staticmethod
     def __failed_pattern(failed_node_id):
         return "Node FAILED: .\\{1,\\}Node \\[id=" + failed_node_id
 
-    def __choose_node_to_kill(self, nodes_to_kill):
+    def __choose_node_to_kill(self, kill_coordinator, nodes_to_kill):
+        assert nodes_to_kill > 0, "No nodes to kill passed. Check the parameters."
+
         nodes = self.servers.nodes
         coordinator = nodes[0].discovery_info().coordinator
+        to_kill = []
 
-        if nodes_to_kill < 1:
-            to_kill = next(node for node in nodes if node.discovery_info().node_id == coordinator)
-        else:
-            to_kill = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+        if kill_coordinator:
+            to_kill.append(next(node for node in nodes if node.discovery_info().node_id == coordinator))
+            nodes_to_kill -= 1
 
-        to_kill = [to_kill] if not isinstance(to_kill, list) else to_kill
+        if nodes_to_kill > 0:
+            choice = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+            to_kill.extend([choice] if not isinstance(choice, list) else choice)
 
         survive = random.choice([node for node in self.servers.nodes if node not in to_kill])
 
         return to_kill, survive
+
+    def __start_loading(self, ignite_version, properties, modules):
+        self.loader = IgniteApplicationService(
+            self.test_context,
+            java_class_name="org.apache.ignite.internal.ducktest.tests.ContinuousDataLoadApplication",
+            version=ignite_version,
+            modules=modules,
+            properties=properties,
+            params={"cacheName": "test-cache", "range": self.DATA_AMOUNT})
+
+        self.loader.start()
+
+    def __start_zk_quorum(self):
+        self.zk_quorum = ZookeeperService(self.test_context, 3)
+
+        self.zk_quorum.start()
+
+    @staticmethod
+    def __properties(zookeeper_settings=None):
+        """
+        :param zookeeper_settings: ZooKeeperDiscoverySpi settings. If None, TcpDiscoverySpi will be used.
+        :return: Rendered node's properties.
+        """
+        return Template(DiscoveryTest.CONFIG_TEMPLATE) \
+            .render(failure_detection_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT,
+                    zookeeper_settings=zookeeper_settings)
+
+    @staticmethod
+    def __zk_properties(connection_string):
+        return DiscoveryTest.__properties(zookeeper_settings={'connection_string': connection_string})