You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/03/23 15:52:06 UTC

[ignite] branch ignite-ducktape updated: IGNITE-14300 Rebalance speed as part of test result data (#8891)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 6933aa9  IGNITE-14300 Rebalance speed as part of test result data (#8891)
6933aa9 is described below

commit 6933aa97970cf84e6b73898843647ff8c563886b
Author: Dmitriy Sorokin <sb...@gmail.com>
AuthorDate: Tue Mar 23 18:51:40 2021 +0300

    IGNITE-14300 Rebalance speed as part of test result data (#8891)
---
 .../tests/rebalance/DataGenerationApplication.java |  5 +-
 .../ducktests/tests/ignitetest/services/ignite.py  |  5 +-
 .../tests/ignitetest/services/ignite_app.py        | 27 +++++--
 .../ignitetest/services/utils/ignite_aware.py      | 26 +++++++
 .../tests/ignitetest/tests/rebalance/__init__.py   | 82 ++++++++++++++++++++--
 .../ignitetest/tests/rebalance/in_memory_test.py   | 26 +++++--
 6 files changed, 150 insertions(+), 21 deletions(-)

diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
index 1e454af..ce3041c 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/rebalance/DataGenerationApplication.java
@@ -38,8 +38,7 @@ public class DataGenerationApplication extends IgniteAwareApplication {
         int entryCnt = jsonNode.get("entryCount").asInt();
         int entrySize = jsonNode.get("entrySize").asInt();
 
-        log.info("Data generation started [backups=" + backups + ", cacheCount=" + cacheCnt
-            + ", entryCount=" + entryCnt + ", entrySize=" + entrySize + "]");
+        markInitialized();
 
         for (int i = 1; i <= cacheCnt; i++) {
             // TODO https://issues.apache.org/jira/browse/IGNITE-14319
@@ -50,7 +49,7 @@ public class DataGenerationApplication extends IgniteAwareApplication {
             generateCacheData(cache.getName(), entryCnt, entrySize);
         }
 
-        markSyncExecutionComplete();
+        markFinished();
     }
 
     /**
diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py b/modules/ducktests/tests/ignitetest/services/ignite.py
index 483ef2c..13e8f5c 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -17,9 +17,7 @@
 This module contains class to start ignite cluster node.
 """
 
-import re
 import signal
-from datetime import datetime
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
 
@@ -83,5 +81,4 @@ def get_event_time(service, log_node, log_pattern, from_the_beginning=True, time
     _, stdout, _ = log_node.account.ssh_client.exec_command(
         "grep '%s' %s" % (log_pattern, log_node.log_file))
 
-    return datetime.strptime(re.match("^\\[[^\\[]+\\]", stdout.read().decode("utf-8")).group(),
-                             "[%Y-%m-%d %H:%M:%S,%f]")
+    return IgniteAwareService.event_time(log_pattern, log_node)
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index a77f6e3..689ce53 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -31,6 +31,9 @@ class IgniteApplicationService(IgniteAwareService):
     """
 
     SERVICE_JAVA_CLASS_NAME = "org.apache.ignite.internal.ducktest.utils.IgniteAwareApplicationService"
+    APP_INIT_EVT_MSG = "IGNITE_APPLICATION_INITIALIZED"
+    APP_FINISH_EVT_MSG = "IGNITE_APPLICATION_FINISHED"
+    APP_BROKEN_EVT_MSG = "IGNITE_APPLICATION_BROKEN"
 
     # pylint: disable=R0913
     def __init__(self, context, config, java_class_name, num_nodes=1, params="", startup_timeout_sec=60,
@@ -47,18 +50,18 @@ class IgniteApplicationService(IgniteAwareService):
     def await_started(self):
         super().await_started()
 
-        self.__check_status("IGNITE_APPLICATION_INITIALIZED", timeout=self.startup_timeout_sec)
+        self.__check_status(self.APP_INIT_EVT_MSG, timeout=self.startup_timeout_sec)
 
     def await_stopped(self):
         super().await_stopped()
 
-        self.__check_status("IGNITE_APPLICATION_FINISHED")
+        self.__check_status(self.APP_FINISH_EVT_MSG)
 
     def __check_status(self, desired, timeout=1):
-        self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout, from_the_beginning=True)
+        self.await_event("%s\\|%s" % (desired, self.APP_BROKEN_EVT_MSG), timeout, from_the_beginning=True)
 
         try:
-            self.await_event("IGNITE_APPLICATION_BROKEN", 1, from_the_beginning=True)
+            self.await_event(self.APP_BROKEN_EVT_MSG, 1, from_the_beginning=True)
             raise IgniteExecutionException("Java application execution failed. %s" % self.extract_result("ERROR"))
         except TimeoutError:
             pass
@@ -68,6 +71,22 @@ class IgniteApplicationService(IgniteAwareService):
         except Exception:
             raise Exception("Java application execution failed.") from None
 
+    def get_init_time(self, selector=min):
+        """
+        Gets the time of application init event.
+        :param selector: Selector function, default is min.
+        :return: Application initialization time.
+        """
+        return self.get_event_time(self.APP_INIT_EVT_MSG, selector=selector)
+
+    def get_finish_time(self, selector=max):
+        """
+        Gets the time of application finish event.
+        :param selector: Selector function, default is max.
+        :return: Application finish time.
+        """
+        return self.get_event_time(self.APP_FINISH_EVT_MSG, selector=selector)
+
     def clean_node(self, node, **kwargs):
         if self.alive(node):
             self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 794cdf1..2b8e0d2 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -17,6 +17,7 @@
 This module contains the base class to build services aware of Ignite.
 """
 import os
+import re
 import signal
 import socket
 import sys
@@ -275,6 +276,31 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
             self.await_event_on_node(evt_message, node, timeout_sec, from_the_beginning=from_the_beginning,
                                      backoff_sec=backoff_sec)
 
+    @staticmethod
+    def event_time(evt_message, node):
+        """
+        Gets the time of specific event message in a node's log file.
+        :param evt_message: Pattern to search log for.
+        :param node: Ducktape node to searching log.
+        :return: Time of found log message matched to pattern or None if not found.
+        """
+        _, stdout, _ = node.account.ssh_client.exec_command(
+            "grep '%s' %s" % (evt_message, node.log_file))
+
+        match = re.match("^\\[[^\\[]+\\]", stdout.read().decode("utf-8"))
+
+        return datetime.strptime(match.group(), "[%Y-%m-%d %H:%M:%S,%f]") if match else None
+
+    def get_event_time(self, evt_message, selector=max):
+        """
+        Gets the time of the specific event from all nodes, using selector.
+        :param evt_message: Event message.
+        :param selector: Selector function, default is max.
+        :return: Minimal event time.
+        """
+        return selector(filter(lambda t: t is not None,
+                               map(lambda node: self.event_time(evt_message, node), self.nodes)), default=None)
+
     def exec_on_nodes_async(self, nodes, task, simultaneously=True, delay_ms=0, timeout_sec=20):
         """
         Executes given task on the nodes.
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
index fdf3792..862b674 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/__init__.py
@@ -17,6 +17,9 @@
 This package contains rebalance tests.
 """
 
+from datetime import datetime
+from typing import NamedTuple
+
 # pylint: disable=W0622
 from ducktape.errors import TimeoutError
 
@@ -45,9 +48,10 @@ def preload_data(context, config, backups, cache_count, entry_count, entry_size,
         startup_timeout_sec=timeout)
     app.run()
 
-    return (get_event_time(
-        app, app.nodes[0], "Marking as initialized") - get_event_time(
-        app, app.nodes[0], "Data generation started")).total_seconds()
+    app.await_started()
+    app.await_stopped()
+
+    return (app.get_finish_time() - app.get_init_time()).total_seconds()
 
 
 def await_rebalance_start(ignite, timeout=1):
@@ -68,7 +72,7 @@ def await_rebalance_start(ignite, timeout=1):
         except TimeoutError:
             continue
         else:
-            return {"node": node, "time": rebalance_start_time}
+            return node, rebalance_start_time
 
     raise RuntimeError("Rebalance start was not detected on any node")
 
@@ -93,3 +97,73 @@ def await_rebalance_complete(ignite, node=None, cache_count=1, timeout=300):
             timeout=timeout))
 
     return max(rebalance_complete_times)
+
+
+def get_rebalance_metrics(node, cache_group):
+    """
+    Gets rebalance metrics for specified node and cache group.
+    :param node: Ignite node.
+    :param cache_group: Cache group.
+    :return: RebalanceMetrics instance.
+    """
+    mbean = node.jmx_client().find_mbean('.*group=cacheGroups.*name="%s"' % cache_group)
+    start_time = to_datetime(int(next(mbean.RebalancingStartTime)))
+    end_time = to_datetime(int(next(mbean.RebalancingEndTime)))
+
+    return RebalanceMetrics(
+        received_bytes=int(next(mbean.RebalancingReceivedBytes)),
+        start_time=start_time,
+        end_time=end_time,
+        duration=(end_time - start_time).total_seconds() if start_time and end_time else 0)
+
+
+def to_datetime(timestamp):
+    """
+    Converts timestamp in millicesonds to datetime.
+    :param timestamp: Timestamp in milliseconds.
+    :return: datetime constructed from timestamp or None if ts == -1.
+    """
+    return None if timestamp == -1 else datetime.fromtimestamp(timestamp / 1000.0)
+
+
+class RebalanceMetrics(NamedTuple):
+    """
+    Rebalance metrics
+    """
+    received_bytes: int = 0
+    start_time: datetime = None
+    end_time: datetime = None
+    duration: float = 0
+
+
+def aggregate_rebalance_stats(nodes, cache_count):
+    """
+    Aggregates rebalance stats for specified nodes and cache count:
+    received_bytes -> sum(all of received_bytes)
+    start_time -> min(all of start_time)
+    end_time -> max(all of end_time)
+    duration -> sum(all of duration)
+    :param nodes: Nodes list.
+    :param cache_count: Cache count.
+    :return: RebalanceMetrics instance with aggregated values.
+    """
+    received_bytes = 0
+    start_time = None
+    end_time = None
+    duration = 0
+
+    for node in nodes:
+        for cache_idx in range(cache_count):
+            metrics = get_rebalance_metrics(node, "test-cache-%d" % (cache_idx + 1))
+            received_bytes += metrics.received_bytes
+            if metrics.start_time is not None:
+                start_time = min(t for t in [start_time, metrics.start_time] if t is not None)
+            if metrics.end_time is not None:
+                end_time = max(t for t in [end_time, metrics.end_time] if t is not None)
+            duration += metrics.duration
+
+    return RebalanceMetrics(
+        received_bytes=received_bytes,
+        start_time=start_time,
+        end_time=end_time,
+        duration=duration)
diff --git a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
index c950b84..daa68f1 100644
--- a/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/rebalance/in_memory_test.py
@@ -24,7 +24,8 @@ from ignitetest.services.ignite import IgniteService
 from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration
 from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration
 from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
-from ignitetest.tests.rebalance import preload_data, await_rebalance_start, await_rebalance_complete
+from ignitetest.tests.rebalance import preload_data, await_rebalance_start, await_rebalance_complete, \
+    aggregate_rebalance_stats
 from ignitetest.utils import cluster, ignite_versions
 from ignitetest.utils.enum import constructible
 from ignitetest.utils.ignite_test import IgniteTest
@@ -73,6 +74,7 @@ class RebalanceInMemoryTest(IgniteTest):
                 default=DataRegionConfiguration(max_size=max(
                     cache_count * entry_count * entry_size * (backups + 1),
                     self.DEFAULT_DATA_REGION_SZ))),
+            metric_exporter="org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi",
             rebalance_thread_pool_size=rebalance_thread_pool_size,
             rebalance_batch_size=rebalance_batch_size,
             rebalance_batches_prefetch_count=rebalance_batches_prefetch_count,
@@ -95,10 +97,22 @@ class RebalanceInMemoryTest(IgniteTest):
                                    num_nodes=1)
             ignite.start()
 
-        start_node_and_time = await_rebalance_start(ignite)
+        start_node, start_time = await_rebalance_start(ignite)
 
-        complete_time = await_rebalance_complete(ignite, start_node_and_time["node"], cache_count)
+        end_time = await_rebalance_complete(ignite, start_node, cache_count)
 
-        return {"Rebalanced in (sec)": (complete_time - start_node_and_time["time"]).total_seconds(),
-                "Preloaded in (sec)": preload_time,
-                "Preload speed (MB/sec)": int(cache_count * entry_count * entry_size / 1000 / preload_time) / 1000.0}
+        rebalance_nodes = ignite.nodes[:-1] if trigger_event else ignite.nodes
+
+        stats = aggregate_rebalance_stats(rebalance_nodes, cache_count)
+
+        def speed(d): return (int(stats.received_bytes / d * 1000 / 1024 / 1024) / 1000.0) if d else None
+
+        return {
+            "Rebalanced in (sec)": (end_time - start_time).total_seconds(),
+            "Rebalance nodes": len(rebalance_nodes),
+            "Rebalance speed (Total, MiB/sec)": speed((stats.end_time - stats.start_time).total_seconds()),
+            "Rebalance speed (Average per node, MiB/sec)": speed(stats.duration),
+            "Preloaded in (sec)": preload_time,
+            "Preload speed (MiB/sec)":
+                int(cache_count * entry_count * entry_size * 1000 / 1024 / 1024 / preload_time) / 1000.0
+        }