You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/08/19 09:12:39 UTC

[GitHub] [ignite] anton-vinogradov commented on a change in pull request #8159: Loading in discovery tests.

anton-vinogradov commented on a change in pull request #8159:
URL: https://github.com/apache/ignite/pull/8159#discussion_r472847667



##########
File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -18,29 +18,139 @@
 package org.apache.ignite.internal.ducktest.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  *
  */
 public class DataGenerationApplication extends IgniteAwareApplication {
+    /** Logger. */
+    protected static final Logger log = LogManager.getLogger(DataGenerationApplication.class.getName());
+
+    /** */
+    private static final String PARAM_RANGE = "range";
+
+    /** */
+    private static final String PARAM_INFINITE = "infinite";
+
+    /** */
+    private static final String PARAM_CACHE_NAME = "cacheName";
+
+    /** */
+    private static final String PARAM_OPTIMIZED = "optimized";
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+    /** */
+    private static final String WATCHEABLE_BEGIN_DATA_GEN_MSG = "Begin generating data in background...";
+
     /** {@inheritDoc} */
     @Override protected void run(JsonNode jsonNode) {
-        log.info("Creating cache...");
+        String cacheName = jsonNode.get(PARAM_CACHE_NAME).asText();
+        boolean infinite = jsonNode.hasNonNull(PARAM_INFINITE) && jsonNode.get(PARAM_INFINITE).asBoolean();
+        boolean optimized = !jsonNode.hasNonNull(PARAM_OPTIMIZED) || jsonNode.get(PARAM_OPTIMIZED).asBoolean();
+        int range = jsonNode.get(PARAM_RANGE).asInt();
+
+        if (infinite) {
+            Random rnd = new Random();
+            CountDownLatch exitLatch = new CountDownLatch(1);
+
+            Thread th = new Thread(() -> {
+                log.info(WATCHEABLE_BEGIN_DATA_GEN_MSG);
+
+                boolean error = false;
+
+                try {
+                    while (!terminated())
+                        generateData(cacheName, range, (idx) -> rnd.nextInt(range), optimized);
+
+                    log.info("Background data generation finished.");
+                }
+                catch (Exception e) {
+                    if (!X.hasCause(e, NodeStoppingException.class)) {
+                        error = true;
+
+                        log.error("Failed to generate data in background.", e);
+                    }
+                }
+                finally {
+                    if (error)
+                        markBroken();
+                    else
+                        markFinished();
+
+                    exitLatch.countDown();
+                }
+
+            }, DataGenerationApplication.class.getName() + "_cacheLoader");
 
-        IgniteCache<Integer, Integer> cache = ignite.createCache(jsonNode.get("cacheName").asText());
+            th.start();
 
-        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(cache.getName())) {
-            for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
-                stmr.addData(i, i);
+            markInitialized();
 
-                if (i % 10_000 == 0)
-                    log.info("Streamed " + i + " entries");
+            try {
+                exitLatch.await();
             }
+            catch (InterruptedException e) {

Review comment:
       you may extend run method with "throws InterruptedException" instead

##########
File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -18,29 +18,139 @@
 package org.apache.ignite.internal.ducktest.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  *
  */
 public class DataGenerationApplication extends IgniteAwareApplication {
+    /** Logger. */
+    protected static final Logger log = LogManager.getLogger(DataGenerationApplication.class.getName());
+
+    /** */
+    private static final String PARAM_RANGE = "range";
+
+    /** */
+    private static final String PARAM_INFINITE = "infinite";
+
+    /** */
+    private static final String PARAM_CACHE_NAME = "cacheName";
+
+    /** */
+    private static final String PARAM_OPTIMIZED = "optimized";
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+    /** */
+    private static final String WATCHEABLE_BEGIN_DATA_GEN_MSG = "Begin generating data in background...";
+
     /** {@inheritDoc} */
     @Override protected void run(JsonNode jsonNode) {
-        log.info("Creating cache...");
+        String cacheName = jsonNode.get(PARAM_CACHE_NAME).asText();
+        boolean infinite = jsonNode.hasNonNull(PARAM_INFINITE) && jsonNode.get(PARAM_INFINITE).asBoolean();
+        boolean optimized = !jsonNode.hasNonNull(PARAM_OPTIMIZED) || jsonNode.get(PARAM_OPTIMIZED).asBoolean();
+        int range = jsonNode.get(PARAM_RANGE).asInt();
+
+        if (infinite) {
+            Random rnd = new Random();
+            CountDownLatch exitLatch = new CountDownLatch(1);
+
+            Thread th = new Thread(() -> {
+                log.info(WATCHEABLE_BEGIN_DATA_GEN_MSG);
+
+                boolean error = false;
+
+                try {
+                    while (!terminated())
+                        generateData(cacheName, range, (idx) -> rnd.nextInt(range), optimized);
+
+                    log.info("Background data generation finished.");
+                }
+                catch (Exception e) {
+                    if (!X.hasCause(e, NodeStoppingException.class)) {
+                        error = true;
+
+                        log.error("Failed to generate data in background.", e);
+                    }
+                }
+                finally {
+                    if (error)
+                        markBroken();
+                    else
+                        markFinished();
+
+                    exitLatch.countDown();
+                }
+
+            }, DataGenerationApplication.class.getName() + "_cacheLoader");
 
-        IgniteCache<Integer, Integer> cache = ignite.createCache(jsonNode.get("cacheName").asText());
+            th.start();
 
-        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(cache.getName())) {
-            for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
-                stmr.addData(i, i);
+            markInitialized();
 
-                if (i % 10_000 == 0)
-                    log.info("Streamed " + i + " entries");
+            try {
+                exitLatch.await();
             }
+            catch (InterruptedException e) {
+                log.warn("Interrupted waiting for background loading.");
+            }
+        }
+        else {
+            log.info("Generating data...");
+
+            generateData(cacheName, range, Function.identity(), optimized);
+
+            log.info("Data generation finished. Generated " + range + " entries.");
+
+            markSyncExecutionComplete();
         }
+    }
+
+    /** */
+    private void generateData(String cacheName, int range, Function<Integer, Integer> supplier, boolean optimized) {
+        long notifyTime = System.nanoTime();
+        int streamed = 0;
+
+        if(log.isDebugEnabled())
+            log.debug("Creating cache...");
 
-        markSyncExecutionComplete();
+        IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName);
+
+        try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cacheName)) {
+            streamer.allowOverwrite(true);

Review comment:
       according to the documentation, "Data streamer will perform better if this flag is disabled."

##########
File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -18,29 +18,139 @@
 package org.apache.ignite.internal.ducktest.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  *
  */
 public class DataGenerationApplication extends IgniteAwareApplication {
+    /** Logger. */
+    protected static final Logger log = LogManager.getLogger(DataGenerationApplication.class.getName());
+
+    /** */
+    private static final String PARAM_RANGE = "range";
+
+    /** */
+    private static final String PARAM_INFINITE = "infinite";
+
+    /** */
+    private static final String PARAM_CACHE_NAME = "cacheName";
+
+    /** */
+    private static final String PARAM_OPTIMIZED = "optimized";
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+    /** */
+    private static final String WATCHEABLE_BEGIN_DATA_GEN_MSG = "Begin generating data in background...";

Review comment:
       any reason to have constant with single usage?

##########
File path: modules/ducktests/tests/ignitetest/tests/discovery_test.py
##########
@@ -45,6 +47,8 @@ class DiscoveryTest(IgniteTest):
 
     FAILURE_DETECTION_TIMEOUT = 2000
 
+    __DATA_AMOUNT = 100000

Review comment:
       why __ needed?

##########
File path: modules/ducktests/tests/ignitetest/tests/discovery_test.py
##########
@@ -230,17 +218,38 @@ def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
     def __failed_pattern(failed_node_id):
         return "Node FAILED: .\\{1,\\}Node \\[id=" + failed_node_id
 
-    def __choose_node_to_kill(self, nodes_to_kill):
+    def __choose_node_to_kill(self, kill_coordinator, nodes_to_kill):
         nodes = self.servers.nodes
         coordinator = nodes[0].discovery_info().coordinator
+        to_kill = []
 
-        if nodes_to_kill < 1:
-            to_kill = next(node for node in nodes if node.discovery_info().node_id == coordinator)
-        else:
-            to_kill = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+        if kill_coordinator:
+            to_kill.append(next(node for node in nodes if node.discovery_info().node_id == coordinator))
 
-        to_kill = [to_kill] if not isinstance(to_kill, list) else to_kill
+        if nodes_to_kill > 0:
+            choice = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+            to_kill.extend([choice] if not isinstance(choice, list) else choice)
 
         survive = random.choice([node for node in self.servers.nodes if node not in to_kill])
 
         return to_kill, survive
+
+    def __start_loading(self, ignite_version, properties, modules, wait_sec=0):
+        self.stage("Starting loading")
+
+        self.loader = IgniteApplicationService(
+            self.test_context,
+            java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
+            version=ignite_version,
+            modules=modules,
+            properties=properties,
+            params={"cacheName": "test-cache", "range": self.__DATA_AMOUNT, "infinite": True, "optimized": False})
+
+        self.loader.start()
+
+        for node in self.loader.nodes:
+            self.loader.await_event_on_node("Begin generating data in background...", node, 10, True, 1)

Review comment:
       .start() already waits for markInitialized() use this HB instead.

##########
File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -18,29 +18,139 @@
 package org.apache.ignite.internal.ducktest.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  *
  */
 public class DataGenerationApplication extends IgniteAwareApplication {
+    /** Logger. */
+    protected static final Logger log = LogManager.getLogger(DataGenerationApplication.class.getName());
+
+    /** */
+    private static final String PARAM_RANGE = "range";
+
+    /** */
+    private static final String PARAM_INFINITE = "infinite";
+
+    /** */
+    private static final String PARAM_CACHE_NAME = "cacheName";
+
+    /** */
+    private static final String PARAM_OPTIMIZED = "optimized";
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+    /** */
+    private static final String WATCHEABLE_BEGIN_DATA_GEN_MSG = "Begin generating data in background...";
+
     /** {@inheritDoc} */
     @Override protected void run(JsonNode jsonNode) {
-        log.info("Creating cache...");
+        String cacheName = jsonNode.get(PARAM_CACHE_NAME).asText();
+        boolean infinite = jsonNode.hasNonNull(PARAM_INFINITE) && jsonNode.get(PARAM_INFINITE).asBoolean();

Review comment:
       use asBoolean(dflt) instead

##########
File path: modules/ducktests/tests/ignitetest/tests/discovery_test.py
##########
@@ -91,81 +96,57 @@ def setUp(self):
         pass
 
     def teardown(self):
-        if self.zk_quorum:
-            self.zk_quorum.stop()
+        if self.loader:
+            self.loader.stop()
 
         if self.servers:
             self.servers.stop()
 
-    @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_not_coordinator_single(self, version):
-        """
-        Test single-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 1)
-
-    @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_not_coordinator_two(self, version):
-        """
-        Test two-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 2)
+        if self.zk_quorum:
+            self.zk_quorum.stop()
 
     @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_coordinator(self, version):
-        """
-        Test coordinator-failure scenario with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 0)
-
-    @cluster(num_nodes=NUM_NODES + 3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_not_coordinator_single(self, version):
+    @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+            kill_coordinator=[False, True],
+            nodes_to_kill=[0, 1, 2],
+            with_load=[False, True])
+    def test_tcp(self, ignite_version, kill_coordinator, nodes_to_kill, with_load):
         """
-        Test single node failure scenario (not the coordinator) with ZooKeeper.
+        Test nodes failure scenario with TcpDiscoverySpi.
         """
-        self.__start_zk_quorum()
-
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 1)
+        return self.__simulate_nodes_failure(ignite_version, self.__properties(), None, kill_coordinator,
+                                             nodes_to_kill, with_load)
 
     @cluster(num_nodes=NUM_NODES + 3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_not_coordinator_two(self, version):
+    @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+            kill_coordinator=[False, True],
+            nodes_to_kill=[0, 1, 2],
+            with_load=[False, True])
+    def test_zk(self, ignite_version, kill_coordinator, nodes_to_kill, with_load):
         """
-        Test two-node-failure scenario (not the coordinator) with ZooKeeper.
+        Test node failure scenario with ZooKeeperSpi.
         """
         self.__start_zk_quorum()
 
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 2)
+        properties = self.__zk_properties(self.zk_quorum.connection_string())
+        modules = ["zookeeper"]
 
-    @cluster(num_nodes=NUM_NODES+3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_coordinator(self, version):
-        """
-        Test coordinator-failure scenario with ZooKeeper.
-        """
-        self.__start_zk_quorum()
+        return self.__simulate_nodes_failure(ignite_version, properties, modules, kill_coordinator, nodes_to_kill,
+                                             with_load)
 
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 0)
+    def __simulate_nodes_failure(self, version, properties, modules, kill_coordinator=False, nodes_to_kill=1,
+                                 with_load=False):
+        if nodes_to_kill == 0 and not kill_coordinator:
+            return {"No nodes to kill": "Nothing to do"}
 
-    def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
         """
         :param nodes_to_kill: How many nodes to kill. If <1, the coordinator is the choice. Otherwise: not-coordinator
         nodes of given number.
         """
         self.servers = IgniteService(
             self.test_context,
-            num_nodes=self.NUM_NODES,
-            modules=["zookeeper"],
+            num_nodes=self.NUM_NODES - 1 if with_load else self.NUM_NODES,

Review comment:
       this will produce incomparable results. 
   let's have the same clusters.

##########
File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -18,29 +18,139 @@
 package org.apache.ignite.internal.ducktest.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  *
  */
 public class DataGenerationApplication extends IgniteAwareApplication {
+    /** Logger. */
+    protected static final Logger log = LogManager.getLogger(DataGenerationApplication.class.getName());
+
+    /** */
+    private static final String PARAM_RANGE = "range";
+
+    /** */
+    private static final String PARAM_INFINITE = "infinite";
+
+    /** */
+    private static final String PARAM_CACHE_NAME = "cacheName";
+
+    /** */
+    private static final String PARAM_OPTIMIZED = "optimized";
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+    /** */
+    private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+    /** */
+    private static final String WATCHEABLE_BEGIN_DATA_GEN_MSG = "Begin generating data in background...";
+
     /** {@inheritDoc} */
     @Override protected void run(JsonNode jsonNode) {
-        log.info("Creating cache...");
+        String cacheName = jsonNode.get(PARAM_CACHE_NAME).asText();
+        boolean infinite = jsonNode.hasNonNull(PARAM_INFINITE) && jsonNode.get(PARAM_INFINITE).asBoolean();
+        boolean optimized = !jsonNode.hasNonNull(PARAM_OPTIMIZED) || jsonNode.get(PARAM_OPTIMIZED).asBoolean();
+        int range = jsonNode.get(PARAM_RANGE).asInt();
+
+        if (infinite) {
+            Random rnd = new Random();
+            CountDownLatch exitLatch = new CountDownLatch(1);
+
+            Thread th = new Thread(() -> {
+                log.info(WATCHEABLE_BEGIN_DATA_GEN_MSG);
+
+                boolean error = false;
+
+                try {
+                    while (!terminated())
+                        generateData(cacheName, range, (idx) -> rnd.nextInt(range), optimized);
+
+                    log.info("Background data generation finished.");
+                }
+                catch (Exception e) {
+                    if (!X.hasCause(e, NodeStoppingException.class)) {
+                        error = true;
+
+                        log.error("Failed to generate data in background.", e);
+                    }
+                }
+                finally {
+                    if (error)
+                        markBroken();
+                    else
+                        markFinished();
+
+                    exitLatch.countDown();
+                }
+
+            }, DataGenerationApplication.class.getName() + "_cacheLoader");
 
-        IgniteCache<Integer, Integer> cache = ignite.createCache(jsonNode.get("cacheName").asText());
+            th.start();
 
-        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(cache.getName())) {
-            for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
-                stmr.addData(i, i);
+            markInitialized();
 
-                if (i % 10_000 == 0)
-                    log.info("Streamed " + i + " entries");
+            try {
+                exitLatch.await();

Review comment:
       what's the reason to have separated thread here with sync wait at another?
   can we enlarge possible threads count?

##########
File path: modules/ducktests/tests/ignitetest/tests/discovery_test.py
##########
@@ -230,17 +218,38 @@ def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
     def __failed_pattern(failed_node_id):
         return "Node FAILED: .\\{1,\\}Node \\[id=" + failed_node_id
 
-    def __choose_node_to_kill(self, nodes_to_kill):
+    def __choose_node_to_kill(self, kill_coordinator, nodes_to_kill):
         nodes = self.servers.nodes
         coordinator = nodes[0].discovery_info().coordinator
+        to_kill = []
 
-        if nodes_to_kill < 1:
-            to_kill = next(node for node in nodes if node.discovery_info().node_id == coordinator)
-        else:
-            to_kill = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+        if kill_coordinator:
+            to_kill.append(next(node for node in nodes if node.discovery_info().node_id == coordinator))
 
-        to_kill = [to_kill] if not isinstance(to_kill, list) else to_kill
+        if nodes_to_kill > 0:
+            choice = random.sample([n for n in nodes if n.discovery_info().node_id != coordinator], nodes_to_kill)
+            to_kill.extend([choice] if not isinstance(choice, list) else choice)
 
         survive = random.choice([node for node in self.servers.nodes if node not in to_kill])
 
         return to_kill, survive
+
+    def __start_loading(self, ignite_version, properties, modules, wait_sec=0):
+        self.stage("Starting loading")
+
+        self.loader = IgniteApplicationService(
+            self.test_context,
+            java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
+            version=ignite_version,
+            modules=modules,
+            properties=properties,
+            params={"cacheName": "test-cache", "range": self.__DATA_AMOUNT, "infinite": True, "optimized": False})
+
+        self.loader.start()
+
+        for node in self.loader.nodes:
+            self.loader.await_event_on_node("Begin generating data in background...", node, 10, True, 1)
+
+        if wait_sec > 0:
+            self.logger.info("Waiting for the data load for " + str(wait_sec) + " seconds...")

Review comment:
       this should be covered by application as well

##########
File path: modules/ducktests/tests/ignitetest/tests/discovery_test.py
##########
@@ -91,81 +96,57 @@ def setUp(self):
         pass
 
     def teardown(self):
-        if self.zk_quorum:
-            self.zk_quorum.stop()
+        if self.loader:
+            self.loader.stop()
 
         if self.servers:
             self.servers.stop()
 
-    @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_not_coordinator_single(self, version):
-        """
-        Test single-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 1)
-
-    @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_not_coordinator_two(self, version):
-        """
-        Test two-node-failure scenario (not the coordinator) with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 2)
+        if self.zk_quorum:
+            self.zk_quorum.stop()
 
     @cluster(num_nodes=NUM_NODES)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_tcp_coordinator(self, version):
-        """
-        Test coordinator-failure scenario with TcpDiscoverySpi.
-        """
-        return self.__simulate_nodes_failure(version, self.__properties(), 0)
-
-    @cluster(num_nodes=NUM_NODES + 3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_not_coordinator_single(self, version):
+    @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+            kill_coordinator=[False, True],
+            nodes_to_kill=[0, 1, 2],
+            with_load=[False, True])
+    def test_tcp(self, ignite_version, kill_coordinator, nodes_to_kill, with_load):
         """
-        Test single node failure scenario (not the coordinator) with ZooKeeper.
+        Test nodes failure scenario with TcpDiscoverySpi.
         """
-        self.__start_zk_quorum()
-
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 1)
+        return self.__simulate_nodes_failure(ignite_version, self.__properties(), None, kill_coordinator,
+                                             nodes_to_kill, with_load)
 
     @cluster(num_nodes=NUM_NODES + 3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_not_coordinator_two(self, version):
+    @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+            kill_coordinator=[False, True],
+            nodes_to_kill=[0, 1, 2],
+            with_load=[False, True])
+    def test_zk(self, ignite_version, kill_coordinator, nodes_to_kill, with_load):
         """
-        Test two-node-failure scenario (not the coordinator) with ZooKeeper.
+        Test node failure scenario with ZooKeeperSpi.
         """
         self.__start_zk_quorum()
 
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 2)
+        properties = self.__zk_properties(self.zk_quorum.connection_string())
+        modules = ["zookeeper"]
 
-    @cluster(num_nodes=NUM_NODES+3)
-    @parametrize(version=str(DEV_BRANCH))
-    @parametrize(version=str(LATEST_2_7))
-    def test_zk_coordinator(self, version):
-        """
-        Test coordinator-failure scenario with ZooKeeper.
-        """
-        self.__start_zk_quorum()
+        return self.__simulate_nodes_failure(ignite_version, properties, modules, kill_coordinator, nodes_to_kill,
+                                             with_load)
 
-        return self.__simulate_nodes_failure(version, self.__zk_properties(self.zk_quorum.connection_string()), 0)
+    def __simulate_nodes_failure(self, version, properties, modules, kill_coordinator=False, nodes_to_kill=1,
+                                 with_load=False):
+        if nodes_to_kill == 0 and not kill_coordinator:
+            return {"No nodes to kill": "Nothing to do"}
 
-    def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
         """

Review comment:
       seems you relocated method description to wrong place




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org