You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/03/15 18:06:51 UTC

[ignite] branch ignite-ducktape updated: IGNITE-14228 Ducktape test of rebalance for in-memory mode (#8832)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 572133b  IGNITE-14228 Ducktape test of rebalance for in-memory mode (#8832)
572133b is described below

commit 572133b1b9978e35b93ea5a10abc090fa7813d70
Author: Dmitriy Sorokin <sb...@gmail.com>
AuthorDate: Mon Mar 15 21:06:31 2021 +0300

    IGNITE-14228 Ducktape test of rebalance for in-memory mode (#8832)
---
 .../ducktest/tests/DataGenerationApplication.java  |  46 --------
 .../tests/rebalance/DataGenerationApplication.java | 119 +++++++++++++++++++++
 .../utils/ignite_configuration/__init__.py         |   4 +
 .../services/utils/templates/ignite.xml.j2         |  12 +++
 .../ignitetest/tests/add_node_rebalance_test.py    |  74 -------------
 .../tests/ignitetest/tests/rebalance/__init__.py   |  95 ++++++++++++++++
 .../ignitetest/tests/rebalance/in_memory_test.py   | 104 ++++++++++++++++++
 .../tests/ignitetest/tests/suites/fast_suite.yml   |   2 +-
 8 files changed, 335 insertions(+), 121 deletions(-)

diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
deleted file mode 100644
index a65644a..0000000
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.ducktest.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
-
-/**
- *
- */
-public class DataGenerationApplication extends IgniteAwareApplication {
-    /** {@inheritDoc} */
-    @Override protected void run(JsonNode jsonNode) {
-        log.info("Creating cache...");
-
-        IgniteCache<Integer, Integer> cache = ignite.createCache(jsonNode.get("cacheName").asText());
-
-        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(cache.getName())) {
-            for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
-                stmr.addData(i, i);
-
-                if (i % 10_000 == 0)
-                    log.info("Streamed " + i + " entries");
-            }
-        }
-
-        markSyncExecutionComplete();
-    }
-}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
new file mode 100644
index 0000000..1e454af
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.rebalance;
+
+import java.util.concurrent.ThreadLocalRandom;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ * Application generates cache data by specified parameters.
+ */
+public class DataGenerationApplication extends IgniteAwareApplication {
+    /** Max streamer data size. */
+    private static final long MAX_STREAMER_DATA_SIZE = 100_000_000;
+
+    /** {@inheritDoc} */
+    @Override protected void run(JsonNode jsonNode) throws Exception {
+        int backups = jsonNode.get("backups").asInt();
+        int cacheCnt = jsonNode.get("cacheCount").asInt();
+        int entryCnt = jsonNode.get("entryCount").asInt();
+        int entrySize = jsonNode.get("entrySize").asInt();
+
+        log.info("Data generation started [backups=" + backups + ", cacheCount=" + cacheCnt
+            + ", entryCount=" + entryCnt + ", entrySize=" + entrySize + "]");
+
+        for (int i = 1; i <= cacheCnt; i++) {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-14319
+            IgniteCache<Integer, DataModel> cache = ignite.getOrCreateCache(
+                new CacheConfiguration<Integer, DataModel>("test-cache-" + i)
+                    .setBackups(backups));
+
+            generateCacheData(cache.getName(), entryCnt, entrySize);
+        }
+
+        markSyncExecutionComplete();
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param entryCnt Entry count.
+     * @param entrySize Entry size.
+     */
+    private void generateCacheData(String cacheName, int entryCnt, int entrySize) {
+        int logStreamedEntriesQuant = (int)Math.pow(10, (int)Math.log10(entryCnt) - 1);
+
+        try (IgniteDataStreamer<Integer, DataModel> stmr = ignite.dataStreamer(cacheName)) {
+            for (int i = 0, n = 0; i < entryCnt; i++) {
+                stmr.addData(i, new DataModel(entrySize));
+
+                int streamed = i + 1;
+
+                if (streamed % logStreamedEntriesQuant == 0)
+                    log.info("Streamed " + streamed + " entries into " + cacheName);
+
+                n += entrySize;
+
+                if (n >= MAX_STREAMER_DATA_SIZE) {
+                    n = 0;
+
+                    stmr.flush();
+                }
+            }
+        }
+
+        if (entryCnt % logStreamedEntriesQuant != 0)
+            log.info("Streamed " + entryCnt + " entries into " + cacheName);
+
+        log.info(cacheName + " data generated.");
+    }
+
+    /**
+     * Data model class, which instances used as cache entry values.
+     */
+    private static class DataModel {
+        /** Cached payload. */
+        private static byte[] cachedPayload;
+
+        /** Payload. */
+        private final byte[] payload;
+
+        /**
+         * @param entrySize Entry size.
+         */
+        DataModel(int entrySize) {
+            payload = getPayload(entrySize);
+        }
+
+        /**
+         * @param payloadSize Payload size.
+         */
+        private static byte[] getPayload(int payloadSize) {
+            if (cachedPayload == null || cachedPayload.length != payloadSize) {
+                cachedPayload = new byte[payloadSize];
+
+                ThreadLocalRandom.current().nextBytes(cachedPayload);
+            }
+
+            return cachedPayload;
+        }
+    }
+}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
index e0d1494..c430d0f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
@@ -48,6 +48,10 @@ class IgniteConfiguration(NamedTuple):
     auth_enabled: bool = False
     plugins: list = []
     metric_exporter: str = None
+    rebalance_thread_pool_size: int = None
+    rebalance_batch_size: int = None
+    rebalance_batches_prefetch_count: int = None
+    rebalance_throttle: int = None
 
 
 class IgniteClientConfiguration(IgniteConfiguration):
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
index a25c659..d7bf18f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
@@ -42,6 +42,18 @@
         <property name="failureDetectionTimeout" value="{{ config.failure_detection_timeout }}"/>
         <property name="systemWorkerBlockedTimeout" value="{{ config.sys_worker_blocked_timeout }}"/>
         <property name="authenticationEnabled" value="{{ config.auth_enabled | lower }}"/>
+        {% if config.rebalance_thread_pool_size %}
+            <property name="rebalanceThreadPoolSize" value="{{ config.rebalance_thread_pool_size }}"/>
+        {% endif %}
+        {% if config.rebalance_batch_size %}
+            <property name="rebalanceBatchSize" value="{{ config.rebalance_batch_size }}"/>
+        {% endif %}
+        {% if config.rebalance_batches_prefetch_count %}
+            <property name="rebalanceBatchesPrefetchCount" value="{{ config.rebalance_batches_prefetch_count }}"/>
+        {% endif %}
+        {% if config.rebalance_throttle %}
+            <property name="rebalanceThrottle" value="{{ config.rebalance_throttle }}"/>
+        {% endif %}
 
         {% if config.metric_exporter %}
             <property name="metricExporterSpi">
diff --git a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
deleted file mode 100644
index 5511805..0000000
--- a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Module contains node rebalance tests.
-"""
-
-from ignitetest.services.ignite import IgniteService
-from ignitetest.services.ignite_app import IgniteApplicationService
-from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
-from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
-from ignitetest.utils import ignite_versions, cluster
-from ignitetest.utils.ignite_test import IgniteTest
-from ignitetest.utils.version import DEV_BRANCH, IgniteVersion, LATEST
-
-
-# pylint: disable=W0223
-class AddNodeRebalanceTest(IgniteTest):
-    """
-    Test basic rebalance scenarios.
-    """
-    NUM_NODES = 4
-    PRELOAD_TIMEOUT = 60
-    DATA_AMOUNT = 1000000
-    REBALANCE_TIMEOUT = 60
-
-    @cluster(num_nodes=NUM_NODES + 1)
-    @ignite_versions(str(DEV_BRANCH), str(LATEST))
-    def test_add_node(self, ignite_version):
-        """
-        Test performs add node rebalance test which consists of following steps:
-            * Start cluster.
-            * Put data to it via IgniteClientApp.
-            * Start one more node and awaits for rebalance to finish.
-        """
-        node_config = IgniteConfiguration(version=IgniteVersion(ignite_version))
-
-        ignites = IgniteService(self.test_context, config=node_config, num_nodes=self.NUM_NODES - 1)
-        ignites.start()
-
-        # This client just put some data to the cache.
-        app_config = node_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites))
-        IgniteApplicationService(self.test_context, config=app_config,
-                                 java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
-                                 params={"cacheName": "test-cache", "range": self.DATA_AMOUNT},
-                                 startup_timeout_sec=self.PRELOAD_TIMEOUT).run()
-
-        ignite = IgniteService(self.test_context, node_config._replace(discovery_spi=from_ignite_cluster(ignites)),
-                               num_nodes=1)
-
-        ignite.start()
-
-        start = self.monotonic()
-
-        ignite.await_event("rebalanced=true, wasRebalanced=false",
-                           timeout_sec=AddNodeRebalanceTest.REBALANCE_TIMEOUT,
-                           from_the_beginning=True,
-                           backoff_sec=1)
-
-        data = {"Rebalanced in (sec)": self.monotonic() - start}
-
-        return data
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
new file mode 100644
index 0000000..fdf3792
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This package contains rebalance tests.
+"""
+
+# pylint: disable=W0622
+from ducktape.errors import TimeoutError
+
+from ignitetest.services.ignite import get_event_time
+from ignitetest.services.ignite_app import IgniteApplicationService
+
+
+# pylint: disable=too-many-arguments
+def preload_data(context, config, backups, cache_count, entry_count, entry_size, timeout=3600):
+    """
+    Puts entry_count of key-value pairs of entry_size bytes to cache_count caches.
+    :param context: Test context.
+    :param config: Ignite configuration.
+    :param backups: Cache backups count.
+    :param cache_count: Cache count.
+    :param entry_count: Cache entry count.
+    :param entry_size: Entry size in bytes.
+    :param timeout: Timeout in seconds for application finished.
+    :return: Time taken for data preloading.
+    """
+    app = IgniteApplicationService(
+        context,
+        config=config,
+        java_class_name="org.apache.ignite.internal.ducktest.tests.rebalance.DataGenerationApplication",
+        params={"backups": backups, "cacheCount": cache_count, "entryCount": entry_count, "entrySize": entry_size},
+        startup_timeout_sec=timeout)
+    app.run()
+
+    return (get_event_time(
+        app, app.nodes[0], "Marking as initialized") - get_event_time(
+        app, app.nodes[0], "Data generation started")).total_seconds()
+
+
+def await_rebalance_start(ignite, timeout=1):
+    """
+    Awaits rebalance starting on any test-cache on any node.
+    :param ignite: IgniteService instance.
+    :param timeout: Rebalance start await timeout.
+    :return: dictionary of two keypairs with keys "node" and "time",
+    where "node" contains the first node on which rebalance start was detected
+    and "time" contains the time when rebalance was started.
+    """
+    for node in ignite.nodes:
+        try:
+            rebalance_start_time = get_event_time(
+                ignite, node,
+                "Starting rebalance routine \\[test-cache-",
+                timeout=timeout)
+        except TimeoutError:
+            continue
+        else:
+            return {"node": node, "time": rebalance_start_time}
+
+    raise RuntimeError("Rebalance start was not detected on any node")
+
+
+def await_rebalance_complete(ignite, node=None, cache_count=1, timeout=300):
+    """
+    Awaits rebalance complete on each test-cache.
+    :param ignite: IgniteService instance.
+    :param node: Ignite node in which rebalance will be awaited. If None, the first node in ignite will be used.
+    :param cache_count: The count of test caches to wait for rebalance completion.
+    :param timeout: Rebalance completion timeout.
+    :return: The time of rebalance completion.
+    """
+    rebalance_complete_times = []
+
+    for cache_idx in range(cache_count):
+        rebalance_complete_times.append(get_event_time(
+            ignite,
+            node if node else ignite.nodes[0],
+            "Completed rebalance future: RebalanceFuture \\[%s \\[grp=test-cache-%d" %
+            ("state=STARTED, grp=CacheGroupContext", cache_idx + 1),
+            timeout=timeout))
+
+    return max(rebalance_complete_times)
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
new file mode 100644
index 0000000..c950b84
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Module contains in-memory rebalance tests.
+"""
+from enum import IntEnum
+
+from ducktape.mark import defaults
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration
+from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration
+from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
+from ignitetest.tests.rebalance import preload_data, await_rebalance_start, await_rebalance_complete
+from ignitetest.utils import cluster, ignite_versions
+from ignitetest.utils.enum import constructible
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import IgniteVersion, DEV_BRANCH, LATEST
+
+
+@constructible
+class TriggerEvent(IntEnum):
+    """
+    Rebalance trigger event.
+    """
+    NODE_JOIN = 0
+    NODE_LEFT = 1
+
+
+# pylint: disable=W0223
+class RebalanceInMemoryTest(IgniteTest):
+    """
+    Tests rebalance scenarios in in-memory mode.
+    """
+    NUM_NODES = 4
+    DEFAULT_DATA_REGION_SZ = 512 * 1024 * 1024
+
+    # pylint: disable=too-many-arguments, too-many-locals
+    @cluster(num_nodes=NUM_NODES)
+    @ignite_versions(str(DEV_BRANCH), str(LATEST))
+    @defaults(trigger_event=[TriggerEvent.NODE_JOIN, TriggerEvent.NODE_LEFT],
+              backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000],
+              rebalance_thread_pool_size=[None], rebalance_batch_size=[None],
+              rebalance_batches_prefetch_count=[None], rebalance_throttle=[None])
+    def test(self, ignite_version, trigger_event,
+             backups, cache_count, entry_count, entry_size,
+             rebalance_thread_pool_size, rebalance_batch_size,
+             rebalance_batches_prefetch_count, rebalance_throttle):
+        """
+        Test performs rebalance test which consists of following steps:
+            * Start cluster.
+            * Put data to it via IgniteClientApp.
+            * Triggering a rebalance event and awaits for rebalance to finish.
+        """
+        node_count = len(self.test_context.cluster) - 1
+
+        node_config = IgniteConfiguration(
+            version=IgniteVersion(ignite_version),
+            data_storage=DataStorageConfiguration(
+                default=DataRegionConfiguration(max_size=max(
+                    cache_count * entry_count * entry_size * (backups + 1),
+                    self.DEFAULT_DATA_REGION_SZ))),
+            rebalance_thread_pool_size=rebalance_thread_pool_size,
+            rebalance_batch_size=rebalance_batch_size,
+            rebalance_batches_prefetch_count=rebalance_batches_prefetch_count,
+            rebalance_throttle=rebalance_throttle)
+
+        ignites = IgniteService(self.test_context, config=node_config,
+                                num_nodes=node_count if trigger_event else node_count - 1)
+        ignites.start()
+
+        preload_time = preload_data(
+            self.test_context,
+            node_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites)),
+            backups, cache_count, entry_count, entry_size)
+
+        if trigger_event:
+            ignites.stop_node(ignites.nodes[node_count - 1])
+            ignite = ignites
+        else:
+            ignite = IgniteService(self.test_context, node_config._replace(discovery_spi=from_ignite_cluster(ignites)),
+                                   num_nodes=1)
+            ignite.start()
+
+        start_node_and_time = await_rebalance_start(ignite)
+
+        complete_time = await_rebalance_complete(ignite, start_node_and_time["node"], cache_count)
+
+        return {"Rebalanced in (sec)": (complete_time - start_node_and_time["time"]).total_seconds(),
+                "Preloaded in (sec)": preload_time,
+                "Preload speed (MB/sec)": int(cache_count * entry_count * entry_size / 1000 / preload_time) / 1000.0}
diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
index 7725391..43c2f0a 100644
--- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
+++ b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
@@ -26,7 +26,7 @@ cellular_affinity:
   - ../cellular_affinity_test.py
 
 rebalance:
-  - ../add_node_rebalance_test.py
+  - ../rebalance/in_memory_test.py
 
 clients:
   - ../client_test.py