You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/03/29 09:36:05 UTC
[ignite] branch ignite-ducktape updated: IGNITE-14391 Multi-node
cache data preloading (#8927)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new ac544aa IGNITE-14391 Multi-node cache data preloading (#8927)
ac544aa is described below
commit ac544aab1d2d7ed939db2a7dcdc6d65817697227
Author: Dmitriy Sorokin <sb...@gmail.com>
AuthorDate: Mon Mar 29 12:35:09 2021 +0300
IGNITE-14391 Multi-node cache data preloading (#8927)
---
.../tests/rebalance/DataGenerationApplication.java | 35 ++++++--------
.../tests/ignitetest/tests/rebalance/__init__.py | 55 +++++++++++++++++-----
.../ignitetest/tests/rebalance/in_memory_test.py | 22 ++++-----
3 files changed, 67 insertions(+), 45 deletions(-)
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
index ce3041c..c9d3545 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
@@ -29,14 +29,15 @@ import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
*/
public class DataGenerationApplication extends IgniteAwareApplication {
/** Max streamer data size. */
- private static final long MAX_STREAMER_DATA_SIZE = 100_000_000;
+ private static final int MAX_STREAMER_DATA_SIZE = 100_000_000;
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) throws Exception {
int backups = jsonNode.get("backups").asInt();
int cacheCnt = jsonNode.get("cacheCount").asInt();
- int entryCnt = jsonNode.get("entryCount").asInt();
int entrySize = jsonNode.get("entrySize").asInt();
+ int from = jsonNode.get("from").asInt();
+ int to = jsonNode.get("to").asInt();
markInitialized();
@@ -46,7 +47,7 @@ public class DataGenerationApplication extends IgniteAwareApplication {
new CacheConfiguration<Integer, DataModel>("test-cache-" + i)
.setBackups(backups));
- generateCacheData(cache.getName(), entryCnt, entrySize);
+ generateCacheData(cache.getName(), entrySize, from, to);
}
markFinished();
@@ -54,35 +55,27 @@ public class DataGenerationApplication extends IgniteAwareApplication {
/**
* @param cacheName Cache name.
- * @param entryCnt Entry count.
* @param entrySize Entry size.
+ * @param from From key.
+ * @param to To key.
*/
- private void generateCacheData(String cacheName, int entryCnt, int entrySize) {
- int logStreamedEntriesQuant = (int)Math.pow(10, (int)Math.log10(entryCnt) - 1);
+ private void generateCacheData(String cacheName, int entrySize, int from, int to) {
+ int flushEach = MAX_STREAMER_DATA_SIZE / entrySize + (MAX_STREAMER_DATA_SIZE % entrySize == 0 ? 0 : 1);
+ int logEach = (to - from) / 10;
try (IgniteDataStreamer<Integer, DataModel> stmr = ignite.dataStreamer(cacheName)) {
- for (int i = 0, n = 0; i < entryCnt; i++) {
+ for (int i = from; i < to; i++) {
stmr.addData(i, new DataModel(entrySize));
- int streamed = i + 1;
-
- if (streamed % logStreamedEntriesQuant == 0)
- log.info("Streamed " + streamed + " entries into " + cacheName);
-
- n += entrySize;
-
- if (n >= MAX_STREAMER_DATA_SIZE) {
- n = 0;
+ if ((i - from + 1) % logEach == 0 && log.isDebugEnabled())
+ log.debug("Streamed " + (i - from + 1) + " entries into " + cacheName);
+ if (i % flushEach == 0)
stmr.flush();
- }
}
}
- if (entryCnt % logStreamedEntriesQuant != 0)
- log.info("Streamed " + entryCnt + " entries into " + cacheName);
-
- log.info(cacheName + " data generated.");
+ log.info(cacheName + " data generated [entryCnt=" + (from - to) + ", from=" + from + ", to=" + to + "]");
}
/**
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
index 862b674..9ec1b96 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
@@ -28,11 +28,12 @@ from ignitetest.services.ignite_app import IgniteApplicationService
# pylint: disable=too-many-arguments
-def preload_data(context, config, backups, cache_count, entry_count, entry_size, timeout=3600):
+def preload_data(context, config, preloaders, backups, cache_count, entry_count, entry_size, timeout=3600):
"""
Puts entry_count of key-value pairs of entry_size bytes to cache_count caches.
:param context: Test context.
:param config: Ignite configuration.
+ :param preloaders: Preload client nodes count.
:param backups: Cache backups count.
:param cache_count: Cache count.
:param entry_count: Cache entry count.
@@ -40,18 +41,46 @@ def preload_data(context, config, backups, cache_count, entry_count, entry_size,
:param timeout: Timeout in seconds for application finished.
:return: Time taken for data preloading.
"""
- app = IgniteApplicationService(
- context,
- config=config,
- java_class_name="org.apache.ignite.internal.ducktest.tests.rebalance.DataGenerationApplication",
- params={"backups": backups, "cacheCount": cache_count, "entryCount": entry_count, "entrySize": entry_size},
- startup_timeout_sec=timeout)
- app.run()
-
- app.await_started()
- app.await_stopped()
-
- return (app.get_finish_time() - app.get_init_time()).total_seconds()
+ assert preloaders > 0
+ assert cache_count > 0
+ assert entry_count > 0
+ assert entry_size > 0
+
+ apps = []
+
+ def start_app(from_, to_):
+ app0 = IgniteApplicationService(
+ context,
+ config=config,
+ java_class_name="org.apache.ignite.internal.ducktest.tests.rebalance.DataGenerationApplication",
+ params={
+ "backups": backups,
+ "cacheCount": cache_count,
+ "entrySize": entry_size,
+ "from": from_,
+ "to": to_
+ },
+ shutdown_timeout_sec=timeout)
+ app0.start_async()
+
+ apps.append(app0)
+
+ count = int(entry_count / preloaders)
+ _from = 0
+ _to = 0
+
+ for _ in range(preloaders - 1):
+ _from = _to
+ _to += count
+ start_app(_from, _to)
+
+ start_app(_to, entry_count)
+
+ for app1 in apps:
+ app1.await_stopped()
+
+ return (max(map(lambda app: app.get_finish_time(), apps)) -
+ min(map(lambda app: app.get_init_time(), apps))).total_seconds()
def await_rebalance_start(ignite, timeout=1):
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
index 5056b7c..74b7921 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
@@ -52,36 +52,36 @@ class RebalanceInMemoryTest(IgniteTest):
# pylint: disable=too-many-arguments, too-many-locals
@cluster(num_nodes=NUM_NODES)
@ignite_versions(str(DEV_BRANCH), str(LATEST))
- @defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000],
+ @defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000], preloaders=[1],
thread_pool_size=[None], batch_size=[None], batches_prefetch_count=[None], throttle=[None])
def test_node_join(self, ignite_version,
- backups, cache_count, entry_count, entry_size,
+ backups, cache_count, entry_count, entry_size, preloaders,
thread_pool_size, batch_size, batches_prefetch_count, throttle):
"""
Tests rebalance on node join.
"""
return self.__run(ignite_version, TriggerEvent.NODE_JOIN,
- backups, cache_count, entry_count, entry_size,
+ backups, cache_count, entry_count, entry_size, preloaders,
thread_pool_size, batch_size, batches_prefetch_count, throttle)
# pylint: disable=too-many-arguments, too-many-locals
@cluster(num_nodes=NUM_NODES)
@ignite_versions(str(DEV_BRANCH), str(LATEST))
- @defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000],
+ @defaults(backups=[1], cache_count=[1], entry_count=[15_000], entry_size=[50_000], preloaders=[1],
thread_pool_size=[None], batch_size=[None], batches_prefetch_count=[None], throttle=[None])
def test_node_left(self, ignite_version,
- backups, cache_count, entry_count, entry_size,
+ backups, cache_count, entry_count, entry_size, preloaders,
thread_pool_size, batch_size, batches_prefetch_count, throttle):
"""
Tests rebalance on node left.
"""
return self.__run(ignite_version, TriggerEvent.NODE_LEFT,
- backups, cache_count, entry_count, entry_size,
+ backups, cache_count, entry_count, entry_size, preloaders,
thread_pool_size, batch_size, batches_prefetch_count, throttle)
# pylint: disable=too-many-arguments, too-many-locals
def __run(self, ignite_version, trigger_event,
- backups, cache_count, entry_count, entry_size,
+ backups, cache_count, entry_count, entry_size, preloaders,
thread_pool_size, batch_size, batches_prefetch_count, throttle):
"""
Test performs rebalance test which consists of following steps:
@@ -94,13 +94,14 @@ class RebalanceInMemoryTest(IgniteTest):
:param cache_count: Cache count.
:param entry_count: Cache entry count.
:param entry_size: Cache entry size.
+ :param preloaders: Preload application nodes count.
:param thread_pool_size: rebalanceThreadPoolSize config property.
:param batch_size: rebalanceBatchSize config property.
:param batches_prefetch_count: rebalanceBatchesPrefetchCount config property.
:param throttle: rebalanceThrottle config property.
:return: Rebalance and data preload stats.
"""
- node_count = len(self.test_context.cluster) - 1
+ node_count = len(self.test_context.cluster) - preloaders
node_config = IgniteConfiguration(
version=IgniteVersion(ignite_version),
@@ -121,7 +122,7 @@ class RebalanceInMemoryTest(IgniteTest):
preload_time = preload_data(
self.test_context,
node_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites)),
- backups, cache_count, entry_count, entry_size)
+ preloaders, backups, cache_count, entry_count, entry_size)
if trigger_event:
ignites.stop_node(ignites.nodes[node_count - 1])
@@ -148,6 +149,5 @@ class RebalanceInMemoryTest(IgniteTest):
"Rebalance speed (Total, MiB/sec)": speed((stats.end_time - stats.start_time).total_seconds()),
"Rebalance speed (Average per node, MiB/sec)": speed(stats.duration),
"Preloaded in (sec)": preload_time,
- "Preload speed (MiB/sec)":
- int(cache_count * entry_count * entry_size * 1000 / 1024 / 1024 / preload_time) / 1000.0
+ "Preloaded data size (MiB)": round(cache_count * entry_count * entry_size / (1024 * 1024), 3)
}