You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/08/26 09:37:26 UTC
[ignite] branch ignite-ducktape updated: Loading in discovery
tests. (#8159)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new c6f5504 Loading in discovery tests. (#8159)
c6f5504 is described below
commit c6f55041ad4033d94cda759995c4245192d8bad4
Author: Vladsz83 <vl...@gmail.com>
AuthorDate: Wed Aug 26 12:36:58 2020 +0300
Loading in discovery tests. (#8159)
---
.../tests/ContinuousDataLoadApplication.java | 74 +++++++
.../ducktest/utils/IgniteAwareApplication.java | 25 ++-
.../tests/ignitetest/services/utils/ignite_spec.py | 1 +
.../tests/ignitetest/tests/discovery_test.py | 225 ++++++++++-----------
4 files changed, 203 insertions(+), 122 deletions(-)
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java
new file mode 100644
index 0000000..46a4b76
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests;
+
+import java.util.Random;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Keeps data load until stopped.
+ */
+public class ContinuousDataLoadApplication extends IgniteAwareApplication {
+ /** Logger. */
+ private static final Logger log = LogManager.getLogger(ContinuousDataLoadApplication.class.getName());
+
+ /** {@inheritDoc} */
+ @Override protected void run(JsonNode jsonNode) {
+ String cacheName = jsonNode.get("cacheName").asText();
+ int range = jsonNode.get("range").asInt();
+
+ IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName);
+
+ int warmUpCnt = (int)Math.max(1, 0.1f * range);
+
+ Random rnd = new Random();
+
+ long streamed = 0;
+
+ log.info("Generating data in background...");
+
+ long notifyTime = System.nanoTime();
+
+ while (active()) {
+ cache.put(rnd.nextInt(range), rnd.nextInt(range));
+
+ streamed++;
+
+ if (notifyTime + U.millisToNanos(1500) < System.nanoTime()) {
+ notifyTime = System.nanoTime();
+
+ if (log.isDebugEnabled())
+ log.debug("Streamed " + streamed + " entries.");
+ }
+
+ // Delayed notify of the initialization to make sure the data load has completelly began and
+ // has produced some valuable amount of data.
+ if (!inited() && warmUpCnt == streamed)
+ markInitialized();
+ }
+
+ log.info("Background data generation finished.");
+
+ markFinished();
+ }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
index 5e610f1..107787b 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
@@ -76,8 +76,11 @@ public abstract class IgniteAwareApplication {
else
log.info("Application already done [finished=" + finished + ", broken=" + broken + "]");
+ if (log.isDebugEnabled())
+ log.debug("Waiting for graceful termination...");
+
while (!finished && !broken) {
- log.info("Waiting for graceful termnation.");
+ log.info("Waiting for graceful termination cycle...");
try {
U.sleep(100);
@@ -86,6 +89,9 @@ public abstract class IgniteAwareApplication {
e.printStackTrace();
}
}
+
+ if (log.isDebugEnabled())
+ log.debug("Graceful termination done.");
}));
log.info("ShutdownHook registered.");
@@ -111,7 +117,8 @@ public abstract class IgniteAwareApplication {
log.info(APP_FINISHED);
- removeShutdownHook();
+ if (!terminated())
+ removeShutdownHook();
finished = true;
}
@@ -166,6 +173,20 @@ public abstract class IgniteAwareApplication {
}
/**
+ *
+ */
+ protected boolean inited() {
+ return inited;
+ }
+
+ /**
+ *
+ */
+ protected boolean active() {
+ return !(terminated || broken || finished);
+ }
+
+ /**
* @param name Name.
* @param val Value.
*/
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index f992e3e..9cbfcc0 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -190,6 +190,7 @@ class ApacheIgniteApplicationSpec(IgniteApplicationSpec, IgnitePersistenceAware)
self.jvm_opts.extend([
"-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file ",
"-Dlog4j.configDebug=true",
+ "-DIGNITE_NO_SHUTDOWN_HOOK=true", # allows to perform operations on app termination.
"-Xmx1G",
"-ea",
"-DIGNITE_ALLOW_ATOMIC_OPS_IN_TX=false"
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index 8aec38f..63f6ef8 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -21,16 +21,17 @@ import random
import re
from datetime import datetime
-from ducktape.mark import parametrize
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from jinja2 import Template
+from ignitetest.services.ignite import IgniteAwareService
from ignitetest.services.ignite import IgniteService
-from ignitetest.services.utils.ignite_aware import IgniteAwareService
+from ignitetest.services.ignite_app import IgniteApplicationService
from ignitetest.services.utils.time_utils import epoch_mills
from ignitetest.services.zk.zookeeper import ZookeeperService
from ignitetest.utils.ignite_test import IgniteTest
-from ignitetest.utils.version import DEV_BRANCH, LATEST_2_7
+from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8
# pylint: disable=W0223
@@ -41,10 +42,21 @@ class DiscoveryTest(IgniteTest):
2. Kill random node.
3. Wait that survived node detects node failure.
"""
+ class Config:
+ """
+ Configuration for DiscoveryTest.
+ """
+ def __init__(self, nodes_to_kill=1, kill_coordinator=False, with_load=False):
+ self.nodes_to_kill = nodes_to_kill
+ self.kill_coordinator = kill_coordinator
+ self.with_load = with_load
+
NUM_NODES = 7
FAILURE_DETECTION_TIMEOUT = 2000
+ DATA_AMOUNT = 100000
+
CONFIG_TEMPLATE = """
<property name="failureDetectionTimeout" value="{{ failure_detection_timeout }}"/>
{% if zookeeper_settings %}
@@ -63,145 +75,85 @@ class DiscoveryTest(IgniteTest):
super().__init__(test_context=test_context)
self.zk_quorum = None
self.servers = None
+ self.loader = None
- def __start_zk_quorum(self):
- self.zk_quorum = ZookeeperService(self.test_context, 3)
-
- self.stage("Starting ZooKeeper quorum")
-
- self.zk_quorum.start()
+ @cluster(num_nodes=NUM_NODES)
+ @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+ kill_coordinator=[False, True],
+ nodes_to_kill=[1, 2],
+ with_load=[False, True])
+ def test_tcp(self, ignite_version, kill_coordinator, nodes_to_kill, with_load):
+ """
+ Test nodes failure scenario with TcpDiscoverySpi.
+ """
+ config = DiscoveryTest.Config(nodes_to_kill, kill_coordinator, with_load)
- self.stage("ZooKeeper quorum started")
+ return self.__simulate_nodes_failure(ignite_version, self.__properties(), None, config)
- @staticmethod
- def __properties(zookeeper_settings=None):
+ @cluster(num_nodes=NUM_NODES + 3)
+ @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+ kill_coordinator=[False, True],
+ nodes_to_kill=[1, 2],
+ with_load=[False, True])
+ def test_zk(self, ignite_version, kill_coordinator, nodes_to_kill, with_load):
"""
- :param zookeeper_settings: ZooKeeperDiscoverySpi settings. If None, TcpDiscoverySpi will be used.
- :return: Rendered node's properties.
+ Test node failure scenario with ZooKeeperSpi.
"""
- return Template(DiscoveryTest.CONFIG_TEMPLATE) \
- .render(failure_detection_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT,
- zookeeper_settings=zookeeper_settings)
+ config = DiscoveryTest.Config(nodes_to_kill, kill_coordinator, with_load)
- @staticmethod
- def __zk_properties(connection_string):
- return DiscoveryTest.__properties(zookeeper_settings={'connection_string': connection_string})
+ self.__start_zk_quorum()
+
+ properties = self.__zk_properties(self.zk_quorum.connection_string())
+ modules = ["zookeeper"]
+
+ return self.__simulate_nodes_failure(ignite_version, properties, modules, config)
def setUp(self):
pass
def teardown(self):
- if self.zk_quorum:
- self.zk_quorum.stop()
+ if self.loader:
+ self.loader.stop()
if self.servers:
self.servers.stop()
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_not_coordinator_single(self, version):
- """
- Test single-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 1)
-
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_not_coordinator_two(self, version):
- """
- Test two-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 2)
-
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_coordinator(self, version):
- """
- Test coordinator-failure scenario with TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 0)
-
- @cluster(num_nodes=NUM_NODES + 3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_not_coordinator_single(self, version):
- """
- Test single node failure scenario (not the coordinator) with ZooKeeper.
- """
- self.__start_zk_quorum()
-
- return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 1)
-
- @cluster(num_nodes=NUM_NODES + 3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_not_coordinator_two(self, version):
- """
- Test two-node-failure scenario (not the coordinator) with ZooKeeper.
- """
- self.__start_zk_quorum()
-
- return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 2)
-
- @cluster(num_nodes=NUM_NODES+3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_coordinator(self, version):
- """
- Test coordinator-failure scenario with ZooKeeper.
- """
- self.__start_zk_quorum()
+ if self.zk_quorum:
+ self.zk_quorum.stop()
- return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 0)
+ def __simulate_nodes_failure(self, version, properties, modules, config):
+ if config.nodes_to_kill < 1:
+ return {"No nodes to kill": "Nothing to do"}
- def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
- """
- :param nodes_to_kill: How many nodes to kill. If <1, the coordinator is the choice. Otherwise: not-coordinator
- nodes of given number.
- """
self.servers = IgniteService(
self.test_context,
- num_nodes=self.NUM_NODES,
- modules=["zookeeper"],
+ num_nodes=self.NUM_NODES - 1,
+ modules=modules,
properties=properties,
version=version)
- self.stage("Starting ignite cluster")
-
time_holder = self.monotonic()
self.servers.start()
- if nodes_to_kill > self.servers.num_nodes - 1:
- raise Exception("Too many nodes to kill: " + str(nodes_to_kill))
-
data = {'Ignite cluster start time (s)': round(self.monotonic() - time_holder, 1)}
- self.stage("Topology is ready")
- failed_nodes, survived_node = self.__choose_node_to_kill(nodes_to_kill)
+ failed_nodes, survived_node = self.__choose_node_to_kill(config.kill_coordinator, config.nodes_to_kill)
ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
- self.stage("Stopping " + str(len(failed_nodes)) + " nodes.")
+ if config.with_load:
+ self.__start_loading(version, properties, modules)
first_terminated = self.servers.stop_nodes_async(failed_nodes, clean_shutdown=False, wait_for_stop=False)
- self.stage("Waiting for failure detection of " + str(len(failed_nodes)) + " nodes.")
-
# Keeps dates of logged node failures.
logged_timestamps = []
for failed_id in ids_to_wait:
- self.servers.await_event_on_node(self.__failed_pattern(failed_id), survived_node, 10,
- from_the_beginning=True, backoff_sec=0.01)
- # Save mono of last detected failure.
- time_holder = self.monotonic()
- self.stage("Failure detection measured.")
+ self.servers.await_event_on_node(self.__failed_pattern(failed_id), survived_node, 20,
+ from_the_beginning=True, backoff_sec=0.1)
- for failed_id in ids_to_wait:
_, stdout, _ = survived_node.account.ssh_client.exec_command(
"grep '%s' %s" % (self.__failed_pattern(failed_id), IgniteAwareService.STDOUT_STDERR_CAPTURE))
@@ -211,37 +163,70 @@ class DiscoveryTest(IgniteTest):
logged_timestamps.sort(reverse=True)
- # Failure detection delay.
- time_holder = int((time_holder - first_terminated[0]) * 1000)
- # Failure detection delay by log.
- by_log = epoch_mills(logged_timestamps[0]) - epoch_mills(first_terminated[1])
-
- assert by_log > 0, "Negative node failure detection delay: " + by_log + ". Probably it is a timezone issue."
- assert by_log <= time_holder, "Value of node failure detection delay taken from by the node log (" + \
- str(by_log) + "ms) must be lesser than measured value (" + str(time_holder) + \
- "ms) because watching this event consumes extra time."
+ self.__store_results(data, logged_timestamps, first_terminated[1])
- data['Detection of node(s) failure, measured (ms)'] = time_holder
- data['Detection of node(s) failure, by the log (ms)'] = by_log
data['Nodes failed'] = len(failed_nodes)
return data
@staticmethod
+ def __store_results(data, logged_timestamps, first_kill_time):
+ first_kill_time = epoch_mills(first_kill_time)
+
+ detection_delay = epoch_mills(logged_timestamps[0]) - first_kill_time
+
+ data['Detection of node(s) failure (ms)'] = detection_delay
+ data['All detection delays (ms):'] = str([epoch_mills(ts) - first_kill_time for ts in logged_timestamps])
+
+ @staticmethod
def __failed_pattern(failed_node_id):
return "Node FAILED: .\\{1,\\}Node \\[id=" + failed_node_id
- def __choose_node_to_kill(self, nodes_to_kill):
+ def __choose_node_to_kill(self, kill_coordinator, nodes_to_kill):
+ assert nodes_to_kill > 0, "No nodes to kill passed. Check the parameters."
+
nodes = self.servers.nodes
coordinator = nodes[0].discovery_info().coordinator
+ to_kill = []
- if nodes_to_kill < 1:
- to_kill = next(node for node in nodes if node.discovery_info().node_id == coordinator)
- else:
- to_kill = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+ if kill_coordinator:
+ to_kill.append(next(node for node in nodes if node.discovery_info().node_id == coordinator))
+ nodes_to_kill -= 1
- to_kill = [to_kill] if not isinstance(to_kill, list) else to_kill
+ if nodes_to_kill > 0:
+ choice = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+ to_kill.extend([choice] if not isinstance(choice, list) else choice)
survive = random.choice([node for node in self.servers.nodes if node not in to_kill])
return to_kill, survive
+
+ def __start_loading(self, ignite_version, properties, modules):
+ self.loader = IgniteApplicationService(
+ self.test_context,
+ java_class_name="org.apache.ignite.internal.ducktest.tests.ContinuousDataLoadApplication",
+ version=ignite_version,
+ modules=modules,
+ properties=properties,
+ params={"cacheName": "test-cache", "range": self.DATA_AMOUNT})
+
+ self.loader.start()
+
+ def __start_zk_quorum(self):
+ self.zk_quorum = ZookeeperService(self.test_context, 3)
+
+ self.zk_quorum.start()
+
+ @staticmethod
+ def __properties(zookeeper_settings=None):
+ """
+ :param zookeeper_settings: ZooKeeperDiscoverySpi settings. If None, TcpDiscoverySpi will be used.
+ :return: Rendered node's properties.
+ """
+ return Template(DiscoveryTest.CONFIG_TEMPLATE) \
+ .render(failure_detection_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT,
+ zookeeper_settings=zookeeper_settings)
+
+ @staticmethod
+ def __zk_properties(connection_string):
+ return DiscoveryTest.__properties(zookeeper_settings={'connection_string': connection_string})