You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/03/23 15:52:06 UTC
[ignite] branch ignite-ducktape updated: IGNITE-14300 Rebalance
speed as part of test result data (#8891)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new 6933aa9 IGNITE-14300 Rebalance speed as part of test result data (#8891)
6933aa9 is described below
commit 6933aa97970cf84e6b73898843647ff8c563886b
Author: Dmitriy Sorokin <sb...@gmail.com>
AuthorDate: Tue Mar 23 18:51:40 2021 +0300
IGNITE-14300 Rebalance speed as part of test result data (#8891)
---
.../tests/rebalance/DataGenerationApplication.java | 5 +-
.../ducktests/tests/ignitetest/services/ignite.py | 5 +-
.../tests/ignitetest/services/ignite_app.py | 27 +++++--
.../ignitetest/services/utils/ignite_aware.py | 26 +++++++
.../tests/ignitetest/tests/rebalance/__init__.py | 82 ++++++++++++++++++++--
.../ignitetest/tests/rebalance/in_memory_test.py | 26 +++++--
6 files changed, 150 insertions(+), 21 deletions(-)
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
index 1e454af..ce3041c 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
@@ -38,8 +38,7 @@ public class DataGenerationApplication extends IgniteAwareApplication {
int entryCnt = jsonNode.get("entryCount").asInt();
int entrySize = jsonNode.get("entrySize").asInt();
- log.info("Data generation started [backups=" + backups + ", cacheCount=" + cacheCnt
- + ", entryCount=" + entryCnt + ", entrySize=" + entrySize + "]");
+ markInitialized();
for (int i = 1; i <= cacheCnt; i++) {
// TODO https://issues.apache.org/jira/browse/IGNITE-14319
@@ -50,7 +49,7 @@ public class DataGenerationApplication extends IgniteAwareApplication {
generateCacheData(cache.getName(), entryCnt, entrySize);
}
- markSyncExecutionComplete();
+ markFinished();
}
/**
diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py b/modules/ducktests/tests/ignitetest/services/ignite.py
index 483ef2c..13e8f5c 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -17,9 +17,7 @@
This module contains class to start ignite cluster node.
"""
-import re
import signal
-from datetime import datetime
from ducktape.cluster.remoteaccount import RemoteCommandError
@@ -83,5 +81,4 @@ def get_event_time(service, log_node, log_pattern, from_the_beginning=True, time
_, stdout, _ = log_node.account.ssh_client.exec_command(
"grep '%s' %s" % (log_pattern, log_node.log_file))
- return datetime.strptime(re.match("^\\[[^\\[]+\\]", stdout.read().decode("utf-8")).group(),
- "[%Y-%m-%d %H:%M:%S,%f]")
+ return IgniteAwareService.event_time(log_pattern, log_node)
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index a77f6e3..689ce53 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -31,6 +31,9 @@ class IgniteApplicationService(IgniteAwareService):
"""
SERVICE_JAVA_CLASS_NAME = "org.apache.ignite.internal.ducktest.utils.IgniteAwareApplicationService"
+ APP_INIT_EVT_MSG = "IGNITE_APPLICATION_INITIALIZED"
+ APP_FINISH_EVT_MSG = "IGNITE_APPLICATION_FINISHED"
+ APP_BROKEN_EVT_MSG = "IGNITE_APPLICATION_BROKEN"
# pylint: disable=R0913
def __init__(self, context, config, java_class_name, num_nodes=1, params="", startup_timeout_sec=60,
@@ -47,18 +50,18 @@ class IgniteApplicationService(IgniteAwareService):
def await_started(self):
super().await_started()
- self.__check_status("IGNITE_APPLICATION_INITIALIZED", timeout=self.startup_timeout_sec)
+ self.__check_status(self.APP_INIT_EVT_MSG, timeout=self.startup_timeout_sec)
def await_stopped(self):
super().await_stopped()
- self.__check_status("IGNITE_APPLICATION_FINISHED")
+ self.__check_status(self.APP_FINISH_EVT_MSG)
def __check_status(self, desired, timeout=1):
- self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout, from_the_beginning=True)
+ self.await_event("%s\\|%s" % (desired, self.APP_BROKEN_EVT_MSG), timeout, from_the_beginning=True)
try:
- self.await_event("IGNITE_APPLICATION_BROKEN", 1, from_the_beginning=True)
+ self.await_event(self.APP_BROKEN_EVT_MSG, 1, from_the_beginning=True)
raise IgniteExecutionException("Java application execution failed. %s" % self.extract_result("ERROR"))
except TimeoutError:
pass
@@ -68,6 +71,22 @@ class IgniteApplicationService(IgniteAwareService):
except Exception:
raise Exception("Java application execution failed.") from None
+ def get_init_time(self, selector=min):
+ """
+ Gets the time of application init event.
+ :param selector: Selector function, default is min.
+ :return: Application initialization time.
+ """
+ return self.get_event_time(self.APP_INIT_EVT_MSG, selector=selector)
+
+ def get_finish_time(self, selector=max):
+ """
+ Gets the time of application finish event.
+ :param selector: Selector function, default is max.
+ :return: Application finish time.
+ """
+ return self.get_event_time(self.APP_FINISH_EVT_MSG, selector=selector)
+
def clean_node(self, node, **kwargs):
if self.alive(node):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 794cdf1..2b8e0d2 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -17,6 +17,7 @@
This module contains the base class to build services aware of Ignite.
"""
import os
+import re
import signal
import socket
import sys
@@ -275,6 +276,31 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
self.await_event_on_node(evt_message, node, timeout_sec, from_the_beginning=from_the_beginning,
backoff_sec=backoff_sec)
+ @staticmethod
+ def event_time(evt_message, node):
+ """
+ Gets the time of specific event message in a node's log file.
+ :param evt_message: Pattern to search log for.
+ :param node: Ducktape node to searching log.
+ :return: Time of found log message matched to pattern or None if not found.
+ """
+ _, stdout, _ = node.account.ssh_client.exec_command(
+ "grep '%s' %s" % (evt_message, node.log_file))
+
+ match = re.match("^\\[[^\\[]+\\]", stdout.read().decode("utf-8"))
+
+ return datetime.strptime(match.group(), "[%Y-%m-%d %H:%M:%S,%f]") if match else None
+
+ def get_event_time(self, evt_message, selector=max):
+ """
+ Gets the time of the specific event from all nodes, using selector.
+ :param evt_message: Event message.
+ :param selector: Selector function, default is max.
+ :return: Minimal event time.
+ """
+ return selector(filter(lambda t: t is not None,
+ map(lambda node: self.event_time(evt_message, node), self.nodes)), default=None)
+
def exec_on_nodes_async(self, nodes, task, simultaneously=True, delay_ms=0, timeout_sec=20):
"""
Executes given task on the nodes.
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
index fdf3792..862b674 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
@@ -17,6 +17,9 @@
This package contains rebalance tests.
"""
+from datetime import datetime
+from typing import NamedTuple
+
# pylint: disable=W0622
from ducktape.errors import TimeoutError
@@ -45,9 +48,10 @@ def preload_data(context, config, backups, cache_count, entry_count, entry_size,
startup_timeout_sec=timeout)
app.run()
- return (get_event_time(
- app, app.nodes[0], "Marking as initialized") - get_event_time(
- app, app.nodes[0], "Data generation started")).total_seconds()
+ app.await_started()
+ app.await_stopped()
+
+ return (app.get_finish_time() - app.get_init_time()).total_seconds()
def await_rebalance_start(ignite, timeout=1):
@@ -68,7 +72,7 @@ def await_rebalance_start(ignite, timeout=1):
except TimeoutError:
continue
else:
- return {"node": node, "time": rebalance_start_time}
+ return node, rebalance_start_time
raise RuntimeError("Rebalance start was not detected on any node")
@@ -93,3 +97,73 @@ def await_rebalance_complete(ignite, node=None, cache_count=1, timeout=300):
timeout=timeout))
return max(rebalance_complete_times)
+
+
+def get_rebalance_metrics(node, cache_group):
+ """
+ Gets rebalance metrics for specified node and cache group.
+ :param node: Ignite node.
+ :param cache_group: Cache group.
+ :return: RebalanceMetrics instance.
+ """
+ mbean = node.jmx_client().find_mbean('.*group=cacheGroups.*name="%s"' % cache_group)
+ start_time = to_datetime(int(next(mbean.RebalancingStartTime)))
+ end_time = to_datetime(int(next(mbean.RebalancingEndTime)))
+
+ return RebalanceMetrics(
+ received_bytes=int(next(mbean.RebalancingReceivedBytes)),
+ start_time=start_time,
+ end_time=end_time,
+ duration=(end_time - start_time).total_seconds() if start_time and end_time else 0)
+
+
+def to_datetime(timestamp):
+ """
+ Converts timestamp in millicesonds to datetime.
+ :param timestamp: Timestamp in milliseconds.
+ :return: datetime constructed from timestamp or None if ts == -1.
+ """
+ return None if timestamp == -1 else datetime.fromtimestamp(timestamp / 1000.0)
+
+
+class RebalanceMetrics(NamedTuple):
+ """
+ Rebalance metrics
+ """
+ received_bytes: int = 0
+ start_time: datetime = None
+ end_time: datetime = None
+ duration: float = 0
+
+
+def aggregate_rebalance_stats(nodes, cache_count):
+ """
+ Aggregates rebalance stats for specified nodes and cache count:
+ received_bytes -> sum(all of received_bytes)
+ start_time -> min(all of start_time)
+ end_time -> max(all of end_time)
+ duration -> sum(all of duration)
+ :param nodes: Nodes list.
+ :param cache_count: Cache count.
+ :return: RebalanceMetrics instance with aggregated values.
+ """
+ received_bytes = 0
+ start_time = None
+ end_time = None
+ duration = 0
+
+ for node in nodes:
+ for cache_idx in range(cache_count):
+ metrics = get_rebalance_metrics(node, "test-cache-%d" % (cache_idx + 1))
+ received_bytes += metrics.received_bytes
+ if metrics.start_time is not None:
+ start_time = min(t for t in [start_time, metrics.start_time] if t is not None)
+ if metrics.end_time is not None:
+ end_time = max(t for t in [end_time, metrics.end_time] if t is not None)
+ duration += metrics.duration
+
+ return RebalanceMetrics(
+ received_bytes=received_bytes,
+ start_time=start_time,
+ end_time=end_time,
+ duration=duration)
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
index c950b84..daa68f1 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
@@ -24,7 +24,8 @@ from ignitetest.services.ignite import IgniteService
from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration
from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration
from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
-from ignitetest.tests.rebalance import preload_data, await_rebalance_start, await_rebalance_complete
+from ignitetest.tests.rebalance import preload_data, await_rebalance_start, await_rebalance_complete, \
+ aggregate_rebalance_stats
from ignitetest.utils import cluster, ignite_versions
from ignitetest.utils.enum import constructible
from ignitetest.utils.ignite_test import IgniteTest
@@ -73,6 +74,7 @@ class RebalanceInMemoryTest(IgniteTest):
default=DataRegionConfiguration(max_size=max(
cache_count * entry_count * entry_size * (backups + 1),
self.DEFAULT_DATA_REGION_SZ))),
+ metric_exporter="org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi",
rebalance_thread_pool_size=rebalance_thread_pool_size,
rebalance_batch_size=rebalance_batch_size,
rebalance_batches_prefetch_count=rebalance_batches_prefetch_count,
@@ -95,10 +97,22 @@ class RebalanceInMemoryTest(IgniteTest):
num_nodes=1)
ignite.start()
- start_node_and_time = await_rebalance_start(ignite)
+ start_node, start_time = await_rebalance_start(ignite)
- complete_time = await_rebalance_complete(ignite, start_node_and_time["node"], cache_count)
+ end_time = await_rebalance_complete(ignite, start_node, cache_count)
- return {"Rebalanced in (sec)": (complete_time - start_node_and_time["time"]).total_seconds(),
- "Preloaded in (sec)": preload_time,
- "Preload speed (MB/sec)": int(cache_count * entry_count * entry_size / 1000 / preload_time) / 1000.0}
+ rebalance_nodes = ignite.nodes[:-1] if trigger_event else ignite.nodes
+
+ stats = aggregate_rebalance_stats(rebalance_nodes, cache_count)
+
+ def speed(d): return (int(stats.received_bytes / d * 1000 / 1024 / 1024) / 1000.0) if d else None
+
+ return {
+ "Rebalanced in (sec)": (end_time - start_time).total_seconds(),
+ "Rebalance nodes": len(rebalance_nodes),
+ "Rebalance speed (Total, MiB/sec)": speed((stats.end_time - stats.start_time).total_seconds()),
+ "Rebalance speed (Average per node, MiB/sec)": speed(stats.duration),
+ "Preloaded in (sec)": preload_time,
+ "Preload speed (MiB/sec)":
+ int(cache_count * entry_count * entry_size * 1000 / 1024 / 1024 / preload_time) / 1000.0
+ }