You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/03/29 09:36:05 UTC

[ignite] branch ignite-ducktape updated: IGNITE-14391 Multi-node cache data preloading (#8927)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new ac544aa  IGNITE-14391 Multi-node cache data preloading (#8927)
ac544aa is described below

commit ac544aab1d2d7ed939db2a7dcdc6d65817697227
Author: Dmitriy Sorokin <sb...@gmail.com>
AuthorDate: Mon Mar 29 12:35:09 2021 +0300

    IGNITE-14391 Multi-node cache data preloading (#8927)
---
 .../tests/rebalance/DataGenerationApplication.java | 35 ++++++--------
 .../tests/ignitetest/tests/rebalance/__init__.py   | 55 +++++++++++++++++-----
 .../ignitetest/tests/rebalance/in_memory_test.py   | 22 ++++-----
 3 files changed, 67 insertions(+), 45 deletions(-)

diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
index ce3041c..c9d3545 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
@@ -29,14 +29,15 @@ import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
  */
 public class DataGenerationApplication extends IgniteAwareApplication {
     /** Max streamer data size. */
-    private static final long MAX_STREAMER_DATA_SIZE = 100_000_000;
+    private static final int MAX_STREAMER_DATA_SIZE = 100_000_000;
 
     /** {@inheritDoc} */
     @Override protected void run(JsonNode jsonNode) throws Exception {
         int backups = jsonNode.get("backups").asInt();
         int cacheCnt = jsonNode.get("cacheCount").asInt();
-        int entryCnt = jsonNode.get("entryCount").asInt();
         int entrySize = jsonNode.get("entrySize").asInt();
+        int from = jsonNode.get("from").asInt();
+        int to = jsonNode.get("to").asInt();
 
         markInitialized();
 
@@ -46,7 +47,7 @@ public class DataGenerationApplication extends IgniteAwareApplication {
                 new CacheConfiguration<Integer, DataModel>("test-cache-" + i)
                     .setBackups(backups));
 
-            generateCacheData(cache.getName(), entryCnt, entrySize);
+            generateCacheData(cache.getName(), entrySize, from, to);
         }
 
         markFinished();
@@ -54,35 +55,27 @@ public class DataGenerationApplication extends IgniteAwareApplication {
 
     /**
      * @param cacheName Cache name.
-     * @param entryCnt Entry count.
      * @param entrySize Entry size.
+     * @param from From key.
+     * @param to To key.
      */
-    private void generateCacheData(String cacheName, int entryCnt, int entrySize) {
-        int logStreamedEntriesQuant = (int)Math.pow(10, (int)Math.log10(entryCnt) - 1);
+    private void generateCacheData(String cacheName, int entrySize, int from, int to) {
+        int flushEach = MAX_STREAMER_DATA_SIZE / entrySize + (MAX_STREAMER_DATA_SIZE % entrySize == 0 ? 0 : 1);
+        int logEach = (to - from) / 10;
 
         try (IgniteDataStreamer<Integer, DataModel> stmr = ignite.dataStreamer(cacheName)) {
-            for (int i = 0, n = 0; i < entryCnt; i++) {
+            for (int i = from; i < to; i++) {
                 stmr.addData(i, new DataModel(entrySize));
 
-                int streamed = i + 1;
-
-                if (streamed % logStreamedEntriesQuant == 0)
-                    log.info("Streamed " + streamed + " entries into " + cacheName);
-
-                n += entrySize;
-
-                if (n >= MAX_STREAMER_DATA_SIZE) {
-                    n = 0;
+                if ((i - from + 1) % logEach == 0 && log.isDebugEnabled())
+                    log.debug("Streamed " + (i - from + 1) + " entries into " + cacheName);
 
+                if (i % flushEach == 0)
                     stmr.flush();
-                }
             }
         }
 
-        if (entryCnt % logStreamedEntriesQuant != 0)
-            log.info("Streamed " + entryCnt + " entries into " + cacheName);
-
-        log.info(cacheName + " data generated.");
+        log.info(cacheName + " data generated [entryCnt=" + (from - to) + ", from=" + from + ", to=" + to + "]");
     }
 
     /**
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
index 862b674..9ec1b96 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
@@ -28,11 +28,12 @@ from ignitetest.services.ignite_app import IgniteApplicationService
 
 
 # pylint: disable=too-many-arguments
-def preload_data(context, config, backups, cache_count, entry_count, entry_size, timeout=3600):
+def preload_data(context, config, preloaders, backups, cache_count, entry_count, entry_size, timeout=3600):
     """
     Puts entry_count of key-value pairs of entry_size bytes to cache_count caches.
     :param context: Test context.
     :param config: Ignite configuration.
+    :param preloaders: Preload client nodes count.
     :param backups: Cache backups count.
     :param cache_count: Cache count.
     :param entry_count: Cache entry count.
@@ -40,18 +41,46 @@ def preload_data(context, config, backups, cache_count, entry_count, entry_size,
     :param timeout: Timeout in seconds for application finished.
     :return: Time taken for data preloading.
     """
-    app = IgniteApplicationService(
-        context,
-        config=config,
-        java_class_name="org.apache.ignite.internal.ducktest.tests.rebalance.DataGenerationApplication",
-        params={"backups": backups, "cacheCount": cache_count, "entryCount": entry_count, "entrySize": entry_size},
-        startup_timeout_sec=timeout)
-    app.run()
-
-    app.await_started()
-    app.await_stopped()
-
-    return (app.get_finish_time() - app.get_init_time()).total_seconds()
+    assert preloaders > 0
+    assert cache_count > 0
+    assert entry_count > 0
+    assert entry_size > 0
+
+    apps = []
+
+    def start_app(from_, to_):
+        app0 = IgniteApplicationService(
+            context,
+            config=config,
+            java_class_name="org.apache.ignite.internal.ducktest.tests.rebalance.DataGenerationApplication",
+            params={
+                "backups": backups,
+                "cacheCount": cache_count,
+                "entrySize": entry_size,
+                "from": from_,
+                "to": to_
+            },
+            shutdown_timeout_sec=timeout)
+        app0.start_async()
+
+        apps.append(app0)
+
+    count = int(entry_count / preloaders)
+    _from = 0
+    _to = 0
+
+    for _ in range(preloaders - 1):
+        _from = _to
+        _to += count
+        start_app(_from, _to)
+
+    start_app(_to, entry_count)
+
+    for app1 in apps:
+        app1.await_stopped()
+
+    return (max(map(lambda app: app.get_finish_time(), apps)) -
+            min(map(lambda app: app.get_init_time(), apps))).total_seconds()
 
 
 def await_rebalance_start(ignite, timeout=1):
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
index 5056b7c..74b7921 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
@@ -52,36 +52,36 @@ class RebalanceInMemoryTest(IgniteTest):
     # pylint: disable=too-many-arguments, too-many-locals
     @cluster(num_nodes=NUM_NODES)
     @ignite_versions(str(DEV_BRANCH), str(LATEST))
-    @defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000],
+    @defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000], preloaders=[1],
               thread_pool_size=[None], batch_size=[None], batches_prefetch_count=[None], throttle=[None])
     def test_node_join(self, ignite_version,
-                       backups, cache_count, entry_count, entry_size,
+                       backups, cache_count, entry_count, entry_size, preloaders,
                        thread_pool_size, batch_size, batches_prefetch_count, throttle):
         """
         Tests rebalance on node join.
         """
         return self.__run(ignite_version, TriggerEvent.NODE_JOIN,
-                          backups, cache_count, entry_count, entry_size,
+                          backups, cache_count, entry_count, entry_size, preloaders,
                           thread_pool_size, batch_size, batches_prefetch_count, throttle)
 
     # pylint: disable=too-many-arguments, too-many-locals
     @cluster(num_nodes=NUM_NODES)
     @ignite_versions(str(DEV_BRANCH), str(LATEST))
-    @defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000],
+    @defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000], preloaders=[1],
               thread_pool_size=[None], batch_size=[None], batches_prefetch_count=[None], throttle=[None])
     def test_node_left(self, ignite_version,
-                       backups, cache_count, entry_count, entry_size,
+                       backups, cache_count, entry_count, entry_size, preloaders,
                        thread_pool_size, batch_size, batches_prefetch_count, throttle):
         """
         Tests rebalance on node left.
         """
         return self.__run(ignite_version, TriggerEvent.NODE_LEFT,
-                          backups, cache_count, entry_count, entry_size,
+                          backups, cache_count, entry_count, entry_size, preloaders,
                           thread_pool_size, batch_size, batches_prefetch_count, throttle)
 
     # pylint: disable=too-many-arguments, too-many-locals
     def __run(self, ignite_version, trigger_event,
-              backups, cache_count, entry_count, entry_size,
+              backups, cache_count, entry_count, entry_size, preloaders,
               thread_pool_size, batch_size, batches_prefetch_count, throttle):
         """
         Test performs rebalance test which consists of following steps:
@@ -94,13 +94,14 @@ class RebalanceInMemoryTest(IgniteTest):
         :param cache_count: Cache count.
         :param entry_count: Cache entry count.
         :param entry_size: Cache entry size.
+        :param preloaders: Preload application nodes count.
         :param thread_pool_size: rebalanceThreadPoolSize config property.
         :param batch_size: rebalanceBatchSize config property.
         :param batches_prefetch_count: rebalanceBatchesPrefetchCount config property.
         :param throttle: rebalanceThrottle config property.
         :return: Rebalance and data preload stats.
         """
-        node_count = len(self.test_context.cluster) - 1
+        node_count = len(self.test_context.cluster) - preloaders
 
         node_config = IgniteConfiguration(
             version=IgniteVersion(ignite_version),
@@ -121,7 +122,7 @@ class RebalanceInMemoryTest(IgniteTest):
         preload_time = preload_data(
             self.test_context,
             node_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites)),
-            backups, cache_count, entry_count, entry_size)
+            preloaders, backups, cache_count, entry_count, entry_size)
 
         if trigger_event:
             ignites.stop_node(ignites.nodes[node_count - 1])
@@ -148,6 +149,5 @@ class RebalanceInMemoryTest(IgniteTest):
             "Rebalance speed (Total, MiB/sec)": speed((stats.end_time - stats.start_time).total_seconds()),
             "Rebalance speed (Average per node, MiB/sec)": speed(stats.duration),
             "Preloaded in (sec)": preload_time,
-            "Preload speed (MiB/sec)":
-                int(cache_count * entry_count * entry_size * 1000 / 1024 / 1024 / preload_time) / 1000.0
+            "Preloaded data size (MiB)": round(cache_count * entry_count * entry_size / (1024 * 1024), 3)
         }