You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/03/15 18:06:51 UTC
[ignite] branch ignite-ducktape updated: IGNITE-14228 Ducktape test
of rebalance for in-memory mode (#8832)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new 572133b IGNITE-14228 Ducktape test of rebalance for in-memory mode (#8832)
572133b is described below
commit 572133b1b9978e35b93ea5a10abc090fa7813d70
Author: Dmitriy Sorokin <sb...@gmail.com>
AuthorDate: Mon Mar 15 21:06:31 2021 +0300
IGNITE-14228 Ducktape test of rebalance for in-memory mode (#8832)
---
.../ducktest/tests/DataGenerationApplication.java | 46 --------
.../tests/rebalance/DataGenerationApplication.java | 119 +++++++++++++++++++++
.../utils/ignite_configuration/__init__.py | 4 +
.../services/utils/templates/ignite.xml.j2 | 12 +++
.../ignitetest/tests/add_node_rebalance_test.py | 74 -------------
.../tests/ignitetest/tests/rebalance/__init__.py | 95 ++++++++++++++++
.../ignitetest/tests/rebalance/in_memory_test.py | 104 ++++++++++++++++++
.../tests/ignitetest/tests/suites/fast_suite.yml | 2 +-
8 files changed, 335 insertions(+), 121 deletions(-)
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
deleted file mode 100644
index a65644a..0000000
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.ducktest.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
-
-/**
- *
- */
-public class DataGenerationApplication extends IgniteAwareApplication {
- /** {@inheritDoc} */
- @Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
-
- IgniteCache<Integer, Integer> cache = ignite.createCache(jsonNode.get("cacheName").asText());
-
- try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
-
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
- }
- }
-
- markSyncExecutionComplete();
- }
-}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
new file mode 100644
index 0000000..1e454af
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.rebalance;
+
+import java.util.concurrent.ThreadLocalRandom;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ * Application generates cache data by specified parameters.
+ */
+public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Max streamer data size. */
+ private static final long MAX_STREAMER_DATA_SIZE = 100_000_000;
+
+ /** {@inheritDoc} */
+ @Override protected void run(JsonNode jsonNode) throws Exception {
+ int backups = jsonNode.get("backups").asInt();
+ int cacheCnt = jsonNode.get("cacheCount").asInt();
+ int entryCnt = jsonNode.get("entryCount").asInt();
+ int entrySize = jsonNode.get("entrySize").asInt();
+
+ log.info("Data generation started [backups=" + backups + ", cacheCount=" + cacheCnt
+ + ", entryCount=" + entryCnt + ", entrySize=" + entrySize + "]");
+
+ for (int i = 1; i <= cacheCnt; i++) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-14319
+ IgniteCache<Integer, DataModel> cache = ignite.getOrCreateCache(
+ new CacheConfiguration<Integer, DataModel>("test-cache-" + i)
+ .setBackups(backups));
+
+ generateCacheData(cache.getName(), entryCnt, entrySize);
+ }
+
+ markSyncExecutionComplete();
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param entryCnt Entry count.
+ * @param entrySize Entry size.
+ */
+ private void generateCacheData(String cacheName, int entryCnt, int entrySize) {
+ int logStreamedEntriesQuant = (int)Math.pow(10, (int)Math.log10(entryCnt) - 1);
+
+ try (IgniteDataStreamer<Integer, DataModel> stmr = ignite.dataStreamer(cacheName)) {
+ for (int i = 0, n = 0; i < entryCnt; i++) {
+ stmr.addData(i, new DataModel(entrySize));
+
+ int streamed = i + 1;
+
+ if (streamed % logStreamedEntriesQuant == 0)
+ log.info("Streamed " + streamed + " entries into " + cacheName);
+
+ n += entrySize;
+
+ if (n >= MAX_STREAMER_DATA_SIZE) {
+ n = 0;
+
+ stmr.flush();
+ }
+ }
+ }
+
+ if (entryCnt % logStreamedEntriesQuant != 0)
+ log.info("Streamed " + entryCnt + " entries into " + cacheName);
+
+ log.info(cacheName + " data generated.");
+ }
+
+ /**
+ * Data model class, which instances used as cache entry values.
+ */
+ private static class DataModel {
+ /** Cached payload. */
+ private static byte[] cachedPayload;
+
+ /** Payload. */
+ private final byte[] payload;
+
+ /**
+ * @param entrySize Entry size.
+ */
+ DataModel(int entrySize) {
+ payload = getPayload(entrySize);
+ }
+
+ /**
+ * @param payloadSize Payload size.
+ */
+ private static byte[] getPayload(int payloadSize) {
+ if (cachedPayload == null || cachedPayload.length != payloadSize) {
+ cachedPayload = new byte[payloadSize];
+
+ ThreadLocalRandom.current().nextBytes(cachedPayload);
+ }
+
+ return cachedPayload;
+ }
+ }
+}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
index e0d1494..c430d0f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
@@ -48,6 +48,10 @@ class IgniteConfiguration(NamedTuple):
auth_enabled: bool = False
plugins: list = []
metric_exporter: str = None
+ rebalance_thread_pool_size: int = None
+ rebalance_batch_size: int = None
+ rebalance_batches_prefetch_count: int = None
+ rebalance_throttle: int = None
class IgniteClientConfiguration(IgniteConfiguration):
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
index a25c659..d7bf18f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
@@ -42,6 +42,18 @@
<property name="failureDetectionTimeout" value="{{ config.failure_detection_timeout }}"/>
<property name="systemWorkerBlockedTimeout" value="{{ config.sys_worker_blocked_timeout }}"/>
<property name="authenticationEnabled" value="{{ config.auth_enabled | lower }}"/>
+ {% if config.rebalance_thread_pool_size %}
+ <property name="rebalanceThreadPoolSize" value="{{ config.rebalance_thread_pool_size }}"/>
+ {% endif %}
+ {% if config.rebalance_batch_size %}
+ <property name="rebalanceBatchSize" value="{{ config.rebalance_batch_size }}"/>
+ {% endif %}
+ {% if config.rebalance_batches_prefetch_count %}
+ <property name="rebalanceBatchesPrefetchCount" value="{{ config.rebalance_batches_prefetch_count }}"/>
+ {% endif %}
+ {% if config.rebalance_throttle %}
+ <property name="rebalanceThrottle" value="{{ config.rebalance_throttle }}"/>
+ {% endif %}
{% if config.metric_exporter %}
<property name="metricExporterSpi">
diff --git a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
deleted file mode 100644
index 5511805..0000000
--- a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Module contains node rebalance tests.
-"""
-
-from ignitetest.services.ignite import IgniteService
-from ignitetest.services.ignite_app import IgniteApplicationService
-from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
-from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
-from ignitetest.utils import ignite_versions, cluster
-from ignitetest.utils.ignite_test import IgniteTest
-from ignitetest.utils.version import DEV_BRANCH, IgniteVersion, LATEST
-
-
-# pylint: disable=W0223
-class AddNodeRebalanceTest(IgniteTest):
- """
- Test basic rebalance scenarios.
- """
- NUM_NODES = 4
- PRELOAD_TIMEOUT = 60
- DATA_AMOUNT = 1000000
- REBALANCE_TIMEOUT = 60
-
- @cluster(num_nodes=NUM_NODES + 1)
- @ignite_versions(str(DEV_BRANCH), str(LATEST))
- def test_add_node(self, ignite_version):
- """
- Test performs add node rebalance test which consists of following steps:
- * Start cluster.
- * Put data to it via IgniteClientApp.
- * Start one more node and awaits for rebalance to finish.
- """
- node_config = IgniteConfiguration(version=IgniteVersion(ignite_version))
-
- ignites = IgniteService(self.test_context, config=node_config, num_nodes=self.NUM_NODES - 1)
- ignites.start()
-
- # This client just put some data to the cache.
- app_config = node_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites))
- IgniteApplicationService(self.test_context, config=app_config,
- java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
- params={"cacheName": "test-cache", "range": self.DATA_AMOUNT},
- startup_timeout_sec=self.PRELOAD_TIMEOUT).run()
-
- ignite = IgniteService(self.test_context, node_config._replace(discovery_spi=from_ignite_cluster(ignites)),
- num_nodes=1)
-
- ignite.start()
-
- start = self.monotonic()
-
- ignite.await_event("rebalanced=true, wasRebalanced=false",
- timeout_sec=AddNodeRebalanceTest.REBALANCE_TIMEOUT,
- from_the_beginning=True,
- backoff_sec=1)
-
- data = {"Rebalanced in (sec)": self.monotonic() - start}
-
- return data
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
new file mode 100644
index 0000000..fdf3792
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This package contains rebalance tests.
+"""
+
+# pylint: disable=W0622
+from ducktape.errors import TimeoutError
+
+from ignitetest.services.ignite import get_event_time
+from ignitetest.services.ignite_app import IgniteApplicationService
+
+
+# pylint: disable=too-many-arguments
+def preload_data(context, config, backups, cache_count, entry_count, entry_size, timeout=3600):
+ """
+ Puts entry_count of key-value pairs of entry_size bytes to cache_count caches.
+ :param context: Test context.
+ :param config: Ignite configuration.
+ :param backups: Cache backups count.
+ :param cache_count: Cache count.
+ :param entry_count: Cache entry count.
+ :param entry_size: Entry size in bytes.
+ :param timeout: Timeout in seconds for application finished.
+ :return: Time taken for data preloading.
+ """
+ app = IgniteApplicationService(
+ context,
+ config=config,
+ java_class_name="org.apache.ignite.internal.ducktest.tests.rebalance.DataGenerationApplication",
+ params={"backups": backups, "cacheCount": cache_count, "entryCount": entry_count, "entrySize": entry_size},
+ startup_timeout_sec=timeout)
+ app.run()
+
+ return (get_event_time(
+ app, app.nodes[0], "Marking as initialized") - get_event_time(
+ app, app.nodes[0], "Data generation started")).total_seconds()
+
+
+def await_rebalance_start(ignite, timeout=1):
+ """
+ Awaits rebalance starting on any test-cache on any node.
+ :param ignite: IgniteService instance.
+ :param timeout: Rebalance start await timeout.
+ :return: dictionary of two keypairs with keys "node" and "time",
+ where "node" contains the first node on which rebalance start was detected
+ and "time" contains the time when rebalance was started.
+ """
+ for node in ignite.nodes:
+ try:
+ rebalance_start_time = get_event_time(
+ ignite, node,
+ "Starting rebalance routine \\[test-cache-",
+ timeout=timeout)
+ except TimeoutError:
+ continue
+ else:
+ return {"node": node, "time": rebalance_start_time}
+
+ raise RuntimeError("Rebalance start was not detected on any node")
+
+
+def await_rebalance_complete(ignite, node=None, cache_count=1, timeout=300):
+ """
+ Awaits rebalance complete on each test-cache.
+ :param ignite: IgniteService instance.
+ :param node: Ignite node in which rebalance will be awaited. If None, the first node in ignite will be used.
+ :param cache_count: The count of test caches to wait for rebalance completion.
+ :param timeout: Rebalance completion timeout.
+ :return: The time of rebalance completion.
+ """
+ rebalance_complete_times = []
+
+ for cache_idx in range(cache_count):
+ rebalance_complete_times.append(get_event_time(
+ ignite,
+ node if node else ignite.nodes[0],
+ "Completed rebalance future: RebalanceFuture \\[%s \\[grp=test-cache-%d" %
+ ("state=STARTED, grp=CacheGroupContext", cache_idx + 1),
+ timeout=timeout))
+
+ return max(rebalance_complete_times)
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
new file mode 100644
index 0000000..c950b84
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Module contains in-memory rebalance tests.
+"""
+from enum import IntEnum
+
+from ducktape.mark import defaults
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration
+from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration
+from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
+from ignitetest.tests.rebalance import preload_data, await_rebalance_start, await_rebalance_complete
+from ignitetest.utils import cluster, ignite_versions
+from ignitetest.utils.enum import constructible
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import IgniteVersion, DEV_BRANCH, LATEST
+
+
+@constructible
+class TriggerEvent(IntEnum):
+ """
+ Rebalance trigger event.
+ """
+ NODE_JOIN = 0
+ NODE_LEFT = 1
+
+
+# pylint: disable=W0223
+class RebalanceInMemoryTest(IgniteTest):
+ """
+ Tests rebalance scenarios in in-memory mode.
+ """
+ NUM_NODES = 4
+ DEFAULT_DATA_REGION_SZ = 512 * 1024 * 1024
+
+ # pylint: disable=too-many-arguments, too-many-locals
+ @cluster(num_nodes=NUM_NODES)
+ @ignite_versions(str(DEV_BRANCH), str(LATEST))
+ @defaults(trigger_event=[TriggerEvent.NODE_JOIN, TriggerEvent.NODE_LEFT],
+ backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000],
+ rebalance_thread_pool_size=[None], rebalance_batch_size=[None],
+ rebalance_batches_prefetch_count=[None], rebalance_throttle=[None])
+ def test(self, ignite_version, trigger_event,
+ backups, cache_count, entry_count, entry_size,
+ rebalance_thread_pool_size, rebalance_batch_size,
+ rebalance_batches_prefetch_count, rebalance_throttle):
+ """
+ Test performs rebalance test which consists of following steps:
+ * Start cluster.
+ * Put data to it via IgniteClientApp.
+ * Triggering a rebalance event and awaits for rebalance to finish.
+ """
+ node_count = len(self.test_context.cluster) - 1
+
+ node_config = IgniteConfiguration(
+ version=IgniteVersion(ignite_version),
+ data_storage=DataStorageConfiguration(
+ default=DataRegionConfiguration(max_size=max(
+ cache_count * entry_count * entry_size * (backups + 1),
+ self.DEFAULT_DATA_REGION_SZ))),
+ rebalance_thread_pool_size=rebalance_thread_pool_size,
+ rebalance_batch_size=rebalance_batch_size,
+ rebalance_batches_prefetch_count=rebalance_batches_prefetch_count,
+ rebalance_throttle=rebalance_throttle)
+
+ ignites = IgniteService(self.test_context, config=node_config,
+ num_nodes=node_count if trigger_event else node_count - 1)
+ ignites.start()
+
+ preload_time = preload_data(
+ self.test_context,
+ node_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites)),
+ backups, cache_count, entry_count, entry_size)
+
+ if trigger_event:
+ ignites.stop_node(ignites.nodes[node_count - 1])
+ ignite = ignites
+ else:
+ ignite = IgniteService(self.test_context, node_config._replace(discovery_spi=from_ignite_cluster(ignites)),
+ num_nodes=1)
+ ignite.start()
+
+ start_node_and_time = await_rebalance_start(ignite)
+
+ complete_time = await_rebalance_complete(ignite, start_node_and_time["node"], cache_count)
+
+ return {"Rebalanced in (sec)": (complete_time - start_node_and_time["time"]).total_seconds(),
+ "Preloaded in (sec)": preload_time,
+ "Preload speed (MB/sec)": int(cache_count * entry_count * entry_size / 1000 / preload_time) / 1000.0}
diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
index 7725391..43c2f0a 100644
--- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
+++ b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
@@ -26,7 +26,7 @@ cellular_affinity:
- ../cellular_affinity_test.py
rebalance:
- - ../add_node_rebalance_test.py
+ - ../rebalance/in_memory_test.py
clients:
- ../client_test.py