You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/05/07 13:27:02 UTC

[ignite] branch ignite-ducktape updated: IGNITE-14659 Check mutlikey and noncollocated tx impact on sync-free switch (#9051)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 79027ca  IGNITE-14659 Check mutlikey and noncollocated tx impact on sync-free switch (#9051)
79027ca is described below

commit 79027ca34a87f3e6f9d1d43bd57ff6ccb7a8f3a7
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Fri May 7 16:26:35 2021 +0300

    IGNITE-14659 Check mutlikey and noncollocated tx impact on sync-free switch (#9051)
---
 .../CellularPreparedTxStreamer.java                | 89 ++++++++++++++++++----
 .../ignitetest/services/utils/ignite_aware.py      |  8 +-
 .../tests/ignitetest/services/utils/ignite_spec.py | 11 ++-
 .../tests/ignitetest/services/utils/path.py        | 14 +++-
 .../services/utils/templates/ignite.xml.j2         |  4 +-
 .../services/utils/templates/log4j.xml.j2          |  8 +-
 .../ignitetest/tests/cellular_affinity_test.py     | 51 ++++++++++---
 .../ducktests/tests/ignitetest/tests/self_test.py  |  8 +-
 .../tests/ignitetest/utils/ignite_test.py          | 25 +++---
 9 files changed, 168 insertions(+), 50 deletions(-)

diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
index 9c7a97e..712f7a36 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.ducktest.tests.cellular_affinity_test;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.ignite.IgniteCache;
@@ -38,7 +40,11 @@ public class CellularPreparedTxStreamer extends IgniteAwareApplication {
         final String cacheName = jsonNode.get("cacheName").asText();
         final String attr = jsonNode.get("attr").asText();
         final String cell = jsonNode.get("cell").asText();
-        final int txCnt = jsonNode.get("txCnt").asInt();
+        final int txCnt = jsonNode.get("colocatedTxCnt").asInt();
+        final int multiTxCnt = jsonNode.get("multiTxCnt").asInt();
+        final int noncolocatedTxCnt = jsonNode.get("noncolocatedTxCnt").asInt();
+
+        final String avoidCell = "C0"; // Always exist, should show speed of non-affected cell.
 
         markInitialized();
 
@@ -51,32 +57,71 @@ public class CellularPreparedTxStreamer extends IgniteAwareApplication {
         Affinity<Integer> aff = ignite.affinity(cacheName);
 
         int cnt = 0;
-        int i = -1; // Negative keys to have no intersection with load.
+        int i = 0; // Negative keys to have no intersection with load.
+
+        // Single key transactions affects only current cell.
+        // Will cause delay during current cell switch.
 
         while (cnt != txCnt && !terminated()) {
-            Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(i);
+            if (getCellIdByKey(aff, --i, attr).equals(cell)) {
+                Transaction tx = ignite.transactions().txStart();
+
+                cache.put(i, i);
+
+                ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
 
-            Map<Object, Long> stat = nodes.stream().collect(
-                Collectors.groupingBy(n -> n.attributes().get(attr), Collectors.counting()));
+                if (cnt++ % 100 == 0)
+                    log.info("Long Tx prepared [key=" + i + ",cnt=" + cnt + "]");
+            }
+        }
+
+        // Multikey transactions.
+        // May cause delay during current and other cell switch.
+
+        cnt = 0;
 
-            assert 1 == stat.keySet().size() :
-                "Partition should be located on nodes from only one cell " +
-                    "[key=" + i + ", nodes=" + nodes.size() + ", stat=" + stat + "]";
+        assert i > -10_000_000;
+        i = -10_000_000; // To have no intersection with other node's txs.
+
+        while (cnt != multiTxCnt && !terminated()) {
+            Set<Integer> keys = new HashSet<>();
+
+            while (keys.size() < 3) {
+                if (!getCellIdByKey(aff, --i, attr).equals(avoidCell))
+                    keys.add(i);
+            }
 
-            if (stat.containsKey(cell)) {
-                cnt++;
+            Transaction tx = ignite.transactions().txStart();
 
+            for (int key : keys)
+                cache.put(key, key);
+
+            ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+            if (cnt++ % 100 == 0)
+                log.info("Long Multikey Tx prepared [key=" + i + ",cnt=" + cnt + "]");
+        }
+
+        // Transactions started from this node but contain no local keys.
+        // Should not cause significant delay during any cell switch.
+
+        cnt = 0;
+        assert i > -20_000_000;
+        i = -20_000_000; // To have no intersection with other node's txs.
+
+        while (cnt != noncolocatedTxCnt && !terminated()) {
+            String keyCell = getCellIdByKey(aff, --i, attr);
+
+            if (!keyCell.equals(cell) && !keyCell.equals(avoidCell)) {
                 Transaction tx = ignite.transactions().txStart();
 
                 cache.put(i, i);
 
                 ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
 
-                if (cnt % 100 == 0)
-                    log.info("Long Tx prepared [key=" + i + ",cnt=" + cnt + ", cell=" + stat.keySet() + "]");
+                if (cnt++ % 100 == 0)
+                    log.info("Long Noncolocated Tx prepared [key=" + i + ",cnt=" + cnt + "]");
             }
-
-            i--;
         }
 
         log.info("ALL_TRANSACTIONS_PREPARED (" + cnt + ")");
@@ -89,4 +134,20 @@ public class CellularPreparedTxStreamer extends IgniteAwareApplication {
 
         markFinished();
     }
+
+    /**
+     *
+     */
+    private String getCellIdByKey(Affinity<Integer> aff, int key, String attr) {
+        Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+        Map<Object, Long> stat = nodes.stream().collect(
+            Collectors.groupingBy(n -> n.attributes().get(attr), Collectors.counting()));
+
+        assert 1 == stat.keySet().size() :
+            "Partition should be located on nodes from only one cell " +
+                "[key=" + key + ", nodes=" + nodes.size() + ", stat=" + stat + "]";
+
+        return (String)stat.keySet().iterator().next();
+    }
 }
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 663d2f2..c7aa560 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -195,7 +195,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
             .prepare_for_env(self, node)
 
         for name, template in self.spec.config_templates():
-            config_txt = template.render(config_dir=self.config_dir, work_dir=self.work_dir, config=config)
+            config_txt = template.render(service=self, config=config)
 
             node.account.create_file(os.path.join(self.config_dir, name), config_txt)
 
@@ -460,13 +460,13 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
         Update the node log file.
         """
         if not hasattr(node, 'log_file'):
-            node.log_file = os.path.join(self.log_dir, "console.log")
+            node.log_file = os.path.join(self.log_dir, "ignite.log")
 
         cnt = list(node.account.ssh_capture(f'ls {self.log_dir} | '
-                                            f'grep -E "^console.log(.[0-9]+)?$" | '
+                                            f'grep -E "^ignite.log(.[0-9]+)?$" | '
                                             f'wc -l', callback=int))[0]
         if cnt > 0:
-            rotated_log = os.path.join(self.log_dir, f"console.log.{cnt}")
+            rotated_log = os.path.join(self.log_dir, f"ignite.log.{cnt}")
             self.logger.debug(f"rotating {node.log_file} to {rotated_log} on {node.name}")
             node.account.ssh(f"mv {node.log_file} {rotated_log}")
 
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 292bb5e..3a76af7 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -31,6 +31,7 @@ from ignitetest.services.utils.config_template import IgniteClientConfigTemplate
 from ignitetest.services.utils.jvm_utils import create_jvm_settings, merge_jvm_settings
 from ignitetest.services.utils.path import get_home_dir, get_module_path, IgnitePathAware
 from ignitetest.services.utils.ssl.ssl_params import is_ssl_enabled
+from ignitetest.utils.ignite_test import JFR_ENABLED
 from ignitetest.utils.version import DEV_BRANCH
 
 
@@ -92,6 +93,12 @@ class IgniteSpec(metaclass=ABCMeta):
                             "-Dlog4j.configuration=file:" + self.service.log_config_file,
                             "-Dlog4j.configDebug=true"])
 
+        if service.context.globals.get(JFR_ENABLED, False):
+            self._add_jvm_opts(["-XX:+UnlockCommercialFeatures",
+                                "-XX:+FlightRecorder",
+                                "-XX:StartFlightRecording=dumponexit=true," +
+                                f"filename={self.service.jfr_dir}/recording.jfr"])
+
     def config_templates(self):
         """
         :return: config that service will use to start on a node
@@ -223,7 +230,7 @@ class IgniteNodeSpec(IgniteSpec):
                self.service.script("ignite.sh"),
                self._jvm_opts(),
                self.config_file_path(),
-               node.log_file)
+               os.path.join(self.service.log_dir, "console.log"))
 
         return cmd
 
@@ -253,7 +260,7 @@ class IgniteApplicationSpec(IgniteSpec):
                self.service.script("ignite.sh"),
                self._jvm_opts(),
                ",".join(args),
-               node.log_file)
+               os.path.join(self.service.log_dir, "console.log"))
 
         return cmd
 
diff --git a/modules/ducktests/tests/ignitetest/services/utils/path.py b/modules/ducktests/tests/ignitetest/services/utils/path.py
index efac7a3..71bf0a5 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/path.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/path.py
@@ -51,7 +51,8 @@ class PathAware:
         Init persistent directory.
         :param node: Service node.
         """
-        node.account.mkdirs(f"{self.persistent_root} {self.temp_dir} {self.work_dir} {self.log_dir} {self.config_dir}")
+        node.account.mkdirs(
+            f"{self.persistent_root} {self.temp_dir} {self.work_dir} {self.log_dir} {self.config_dir} {self.jfr_dir}")
 
     def init_logs_attribute(self):
         """
@@ -70,6 +71,10 @@ class PathAware:
             "shared": {
                 "path": self.shared_root,
                 "collect_default": True
+            },
+            "jfr": {
+                "path": self.jfr_dir,
+                "collect_default": True
             }
         })
 
@@ -102,6 +107,13 @@ class PathAware:
         return os.path.join(self.persistent_root, "config")
 
     @property
+    def jfr_dir(self):
+        """
+        :return: path to jfr directory
+        """
+        return os.path.join(self.persistent_root, "jfr")
+
+    @property
     def log_dir(self):
         """
         :return: path to log directory
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
index f8993d0..2cd7370 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
@@ -31,10 +31,10 @@
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
     <bean class="org.apache.ignite.configuration.IgniteConfiguration">
-        <property name="workDirectory" value="{{ work_dir }}" />
+        <property name="workDirectory" value="{{ service.work_dir }}" />
         <property name="gridLogger">
             <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
-                <constructor-arg type="java.lang.String" value="{{ config_dir }}/ignite-log4j.xml"/>
+                <constructor-arg type="java.lang.String" value="{{ service.config_dir }}/ignite-log4j.xml"/>
             </bean>
         </property>
 
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j.xml.j2
index 57a00c0..06e8e12 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j.xml.j2
@@ -20,9 +20,9 @@
 -->
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
-    <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
-        <param name="Target" value="System.err"/>
-
+    <appender name="FILE" class="org.apache.log4j.FileAppender">
+        <param name="File" value="{{ service.log_dir }}/ignite.log"/>
+        <param name="Append" value="true"/>
         <layout class="org.apache.log4j.PatternLayout">
             <param name="ConversionPattern" value="[%d{{ISO8601}}][%-5p][%t][%c{{1}}] %m%n"/>
         </layout>
@@ -58,6 +58,6 @@
 
     <root>
         <level value="INFO"/>
-        <appender-ref ref="CONSOLE_ERR"/>
+        <appender-ref ref="FILE"/>
     </root>
 </log4j:configuration>
diff --git a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
index ae6850e..801e270 100644
--- a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
@@ -54,6 +54,16 @@ class DiscoreryType(IntEnum):
     TCP = 1
 
 
+@constructible
+class TxPrepType(IntEnum):
+    """
+    Transaction preparation type.
+    """
+    CELL_COLOCATED = 0
+    CELL_NONCOLOCATED = 1
+    MULTIKEY = 2
+
+
 # pylint: disable=W0223
 class CellularAffinity(IgniteTest):
     """
@@ -135,11 +145,13 @@ class CellularAffinity(IgniteTest):
     # pylint: disable=R0912
     # pylint: disable=R0914
     # pylint: disable=no-member
+    # pylint: disable=too-many-statements
     @cluster(num_nodes=2 * (NODES_PER_CELL + 1) + 3)  # cell_cnt * (srv_per_cell + cell_streamer) + zookeper_cluster
     @ignite_versions(str(DEV_BRANCH), str(LATEST))
     @matrix(stop_type=[StopType.DROP_NETWORK, StopType.SIGKILL, StopType.SIGTERM],
-            discovery_type=[DiscoreryType.ZooKeeper, DiscoreryType.TCP])
-    def test_latency(self, ignite_version, stop_type, discovery_type):
+            discovery_type=[DiscoreryType.ZooKeeper, DiscoreryType.TCP],
+            prep_type=[TxPrepType.CELL_COLOCATED])
+    def test_latency(self, ignite_version, stop_type, discovery_type, prep_type):
         """
         Tests Cellular switch tx latency.
         """
@@ -169,7 +181,8 @@ class CellularAffinity(IgniteTest):
 
             discovery_spi = from_zookeeper_cluster(zk_quorum)
 
-        cell0, prepared_tx_loader1 = self.start_cell_with_prepared_txs(ignite_version, "C0", discovery_spi, modules)
+        cell0, prepared_tx_loader1 = \
+            self.start_cell_with_prepared_txs(ignite_version, f'C{0}', discovery_spi, modules)
 
         if d_type is DiscoreryType.TCP:
             discovery_spi = from_ignite_cluster(cell0)
@@ -179,14 +192,30 @@ class CellularAffinity(IgniteTest):
         loaders = [prepared_tx_loader1]
         nodes = [cell0]
 
-        for cell in range(1, cells_amount):
+        failed_cell_id = 1
+
+        for cell_id in range(1, cells_amount):
+            # per cell
+            coll_cnt = self.PREPARED_TX_CNT if prep_type == TxPrepType.CELL_COLOCATED else 0
+
+            # should not affect switch speed dramatically, cause recovery but not waiting
+            # avoiding C0 (as not affected) & C1
+            noncoll_cnt = self.PREPARED_TX_CNT * (cells_amount - 2) \
+                if cell_id == failed_cell_id and prep_type == TxPrepType.CELL_NONCOLOCATED else 0
+
+            # cause waiting for txs with failed primary (~ 3/(cells-1) of prepared tx amount)
+            # avoiding C0 (as not affected)
+            multi_cnt = self.PREPARED_TX_CNT * (cells_amount - 1) \
+                if cell_id == failed_cell_id and prep_type == TxPrepType.MULTIKEY else 0
+
             node, prepared_tx_loader = \
-                self.start_cell_with_prepared_txs(ignite_version, "C%d" % cell, discovery_spi, modules)
+                self.start_cell_with_prepared_txs(
+                    ignite_version, f'C{cell_id}', discovery_spi, modules, coll_cnt, noncoll_cnt, multi_cnt)
 
             loaders.append(prepared_tx_loader)
             nodes.append(node)
 
-        failed_loader = loaders[1]
+        failed_loader = loaders[failed_cell_id]
 
         for node in [*nodes, *loaders]:
             node.await_started()
@@ -259,7 +288,9 @@ class CellularAffinity(IgniteTest):
                     "warmup": 10000},
             modules=modules, startup_timeout_sec=180)
 
-    def start_cell_with_prepared_txs(self, version, cell_id, discovery_spi, modules):
+    # pylint: disable=too-many-arguments
+    def start_cell_with_prepared_txs(
+            self, version, cell_id, discovery_spi, modules, col_cnt=0, noncol_cnt=0, multi_cnt=0):
         """
         Starts cell with prepared transactions.
         """
@@ -276,14 +307,16 @@ class CellularAffinity(IgniteTest):
             params={"cacheName": CellularAffinity.CACHE_NAME,
                     "attr": CellularAffinity.ATTRIBUTE,
                     "cell": cell_id,
-                    "txCnt": CellularAffinity.PREPARED_TX_CNT},
+                    "colocatedTxCnt": col_cnt,
+                    "multiTxCnt": multi_cnt,
+                    "noncolocatedTxCnt": noncol_cnt},
             jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id], modules=modules, startup_timeout_sec=180)
 
         prepared_tx_streamer.start_async()  # starts last server node and creates prepared txs on it.
 
         return nodes, prepared_tx_streamer
 
-    # pylint: disable=R0913
+    # pylint: disable=too-many-arguments
     def start_cell(self, version, jvm_opts, discovery_spi=None, modules=None, nodes_cnt=NODES_PER_CELL):
         """
         Starts cell.
diff --git a/modules/ducktests/tests/ignitetest/tests/self_test.py b/modules/ducktests/tests/ignitetest/tests/self_test.py
index c0fed29..9887e70 100644
--- a/modules/ducktests/tests/ignitetest/tests/self_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/self_test.py
@@ -111,7 +111,7 @@ class SelfTest(IgniteTest):
 
         def get_logs_count(service):
             node = service.nodes[0]
-            return list(node.account.ssh_capture(f'ls {service.log_dir}/console.log* | wc -l', callback=int))[0]
+            return list(node.account.ssh_capture(f'ls {service.log_dir}/ignite.log* | wc -l', callback=int))[0]
 
         ignites = IgniteService(self.test_context, IgniteConfiguration(version=IgniteVersion(ignite_version)),
                                 num_nodes=1)
@@ -122,16 +122,16 @@ class SelfTest(IgniteTest):
         for i in range(num_restarts - 1):
             ignites.stop()
 
-            old_cnt = get_log_lines_count(ignites, "console.log")
+            old_cnt = get_log_lines_count(ignites, "ignite.log")
             assert old_cnt > 0
 
             ignites.start(clean=False)
 
-            new_cnt = get_log_lines_count(ignites, "console.log")
+            new_cnt = get_log_lines_count(ignites, "ignite.log")
             assert new_cnt > 0
 
             # check that there is no new entry in rotated file
-            assert old_cnt == get_log_lines_count(ignites, f"console.log.{i + 1}")
+            assert old_cnt == get_log_lines_count(ignites, f"ignite.log.{i + 1}")
 
         assert get_logs_count(ignites) == num_restarts
 
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index 6530eab..a9e82ac 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -24,6 +24,9 @@ from ducktape.tests.test import Test
 # pylint: disable=W0223
 from ignitetest.services.utils.ducktests_service import DucktestsService
 
+# globals:
+JFR_ENABLED = "jfr_enabled"
+
 
 class IgniteTest(Test):
     """
@@ -45,20 +48,22 @@ class IgniteTest(Test):
         return monotonic()
 
     def tearDown(self):
-        self.logger.debug("Killing all runned services to speed-up the tearing down.")
+        # jfr requires graceful shutdown to save the recording.
+        if not self.test_context.globals.get(JFR_ENABLED, False):
+            self.logger.debug("Killing all runned services to speed-up the tearing down.")
 
-        # pylint: disable=W0212
-        for service in self.test_context.services._services.values():
-            assert isinstance(service, DucktestsService)
+            # pylint: disable=W0212
+            for service in self.test_context.services._services.values():
+                assert isinstance(service, DucktestsService)
 
-            try:
-                service.kill()
-            except RemoteCommandError:
-                pass  # Process may be already self-killed on segmentation.
+                try:
+                    service.kill()
+                except RemoteCommandError:
+                    pass  # Process may be already self-killed on segmentation.
 
-            assert service.stopped
+                assert service.stopped
 
-        self.logger.debug("All runned services killed.")
+            self.logger.debug("All runned services killed.")
 
         super().tearDown()