You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/05/07 13:27:02 UTC
[ignite] branch ignite-ducktape updated: IGNITE-14659 Check
mutlikey and noncollocated tx impact on sync-free switch (#9051)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new 79027ca IGNITE-14659 Check mutlikey and noncollocated tx impact on sync-free switch (#9051)
79027ca is described below
commit 79027ca34a87f3e6f9d1d43bd57ff6ccb7a8f3a7
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Fri May 7 16:26:35 2021 +0300
IGNITE-14659 Check mutlikey and noncollocated tx impact on sync-free switch (#9051)
---
.../CellularPreparedTxStreamer.java | 89 ++++++++++++++++++----
.../ignitetest/services/utils/ignite_aware.py | 8 +-
.../tests/ignitetest/services/utils/ignite_spec.py | 11 ++-
.../tests/ignitetest/services/utils/path.py | 14 +++-
.../services/utils/templates/ignite.xml.j2 | 4 +-
.../services/utils/templates/log4j.xml.j2 | 8 +-
.../ignitetest/tests/cellular_affinity_test.py | 51 ++++++++++---
.../ducktests/tests/ignitetest/tests/self_test.py | 8 +-
.../tests/ignitetest/utils/ignite_test.py | 25 +++---
9 files changed, 168 insertions(+), 50 deletions(-)
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
index 9c7a97e..712f7a36 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.ducktest.tests.cellular_affinity_test;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteCache;
@@ -38,7 +40,11 @@ public class CellularPreparedTxStreamer extends IgniteAwareApplication {
final String cacheName = jsonNode.get("cacheName").asText();
final String attr = jsonNode.get("attr").asText();
final String cell = jsonNode.get("cell").asText();
- final int txCnt = jsonNode.get("txCnt").asInt();
+ final int txCnt = jsonNode.get("colocatedTxCnt").asInt();
+ final int multiTxCnt = jsonNode.get("multiTxCnt").asInt();
+ final int noncolocatedTxCnt = jsonNode.get("noncolocatedTxCnt").asInt();
+
+ final String avoidCell = "C0"; // Always exist, should show speed of non-affected cell.
markInitialized();
@@ -51,32 +57,71 @@ public class CellularPreparedTxStreamer extends IgniteAwareApplication {
Affinity<Integer> aff = ignite.affinity(cacheName);
int cnt = 0;
- int i = -1; // Negative keys to have no intersection with load.
+ int i = 0; // Negative keys to have no intersection with load.
+
+ // Single key transactions affects only current cell.
+ // Will cause delay during current cell switch.
while (cnt != txCnt && !terminated()) {
- Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(i);
+ if (getCellIdByKey(aff, --i, attr).equals(cell)) {
+ Transaction tx = ignite.transactions().txStart();
+
+ cache.put(i, i);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
- Map<Object, Long> stat = nodes.stream().collect(
- Collectors.groupingBy(n -> n.attributes().get(attr), Collectors.counting()));
+ if (cnt++ % 100 == 0)
+ log.info("Long Tx prepared [key=" + i + ",cnt=" + cnt + "]");
+ }
+ }
+
+ // Multikey transactions.
+ // May cause delay during current and other cell switch.
+
+ cnt = 0;
- assert 1 == stat.keySet().size() :
- "Partition should be located on nodes from only one cell " +
- "[key=" + i + ", nodes=" + nodes.size() + ", stat=" + stat + "]";
+ assert i > -10_000_000;
+ i = -10_000_000; // To have no intersection with other node's txs.
+
+ while (cnt != multiTxCnt && !terminated()) {
+ Set<Integer> keys = new HashSet<>();
+
+ while (keys.size() < 3) {
+ if (!getCellIdByKey(aff, --i, attr).equals(avoidCell))
+ keys.add(i);
+ }
- if (stat.containsKey(cell)) {
- cnt++;
+ Transaction tx = ignite.transactions().txStart();
+ for (int key : keys)
+ cache.put(key, key);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ if (cnt++ % 100 == 0)
+ log.info("Long Multikey Tx prepared [key=" + i + ",cnt=" + cnt + "]");
+ }
+
+ // Transactions started from this node but contain no local keys.
+ // Should not cause significant delay during any cell switch.
+
+ cnt = 0;
+ assert i > -20_000_000;
+ i = -20_000_000; // To have no intersection with other node's txs.
+
+ while (cnt != noncolocatedTxCnt && !terminated()) {
+ String keyCell = getCellIdByKey(aff, --i, attr);
+
+ if (!keyCell.equals(cell) && !keyCell.equals(avoidCell)) {
Transaction tx = ignite.transactions().txStart();
cache.put(i, i);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
- if (cnt % 100 == 0)
- log.info("Long Tx prepared [key=" + i + ",cnt=" + cnt + ", cell=" + stat.keySet() + "]");
+ if (cnt++ % 100 == 0)
+ log.info("Long Noncolocated Tx prepared [key=" + i + ",cnt=" + cnt + "]");
}
-
- i--;
}
log.info("ALL_TRANSACTIONS_PREPARED (" + cnt + ")");
@@ -89,4 +134,20 @@ public class CellularPreparedTxStreamer extends IgniteAwareApplication {
markFinished();
}
+
+ /**
+ *
+ */
+ private String getCellIdByKey(Affinity<Integer> aff, int key, String attr) {
+ Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+ Map<Object, Long> stat = nodes.stream().collect(
+ Collectors.groupingBy(n -> n.attributes().get(attr), Collectors.counting()));
+
+ assert 1 == stat.keySet().size() :
+ "Partition should be located on nodes from only one cell " +
+ "[key=" + key + ", nodes=" + nodes.size() + ", stat=" + stat + "]";
+
+ return (String)stat.keySet().iterator().next();
+ }
}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 663d2f2..c7aa560 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -195,7 +195,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
.prepare_for_env(self, node)
for name, template in self.spec.config_templates():
- config_txt = template.render(config_dir=self.config_dir, work_dir=self.work_dir, config=config)
+ config_txt = template.render(service=self, config=config)
node.account.create_file(os.path.join(self.config_dir, name), config_txt)
@@ -460,13 +460,13 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
Update the node log file.
"""
if not hasattr(node, 'log_file'):
- node.log_file = os.path.join(self.log_dir, "console.log")
+ node.log_file = os.path.join(self.log_dir, "ignite.log")
cnt = list(node.account.ssh_capture(f'ls {self.log_dir} | '
- f'grep -E "^console.log(.[0-9]+)?$" | '
+ f'grep -E "^ignite.log(.[0-9]+)?$" | '
f'wc -l', callback=int))[0]
if cnt > 0:
- rotated_log = os.path.join(self.log_dir, f"console.log.{cnt}")
+ rotated_log = os.path.join(self.log_dir, f"ignite.log.{cnt}")
self.logger.debug(f"rotating {node.log_file} to {rotated_log} on {node.name}")
node.account.ssh(f"mv {node.log_file} {rotated_log}")
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 292bb5e..3a76af7 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -31,6 +31,7 @@ from ignitetest.services.utils.config_template import IgniteClientConfigTemplate
from ignitetest.services.utils.jvm_utils import create_jvm_settings, merge_jvm_settings
from ignitetest.services.utils.path import get_home_dir, get_module_path, IgnitePathAware
from ignitetest.services.utils.ssl.ssl_params import is_ssl_enabled
+from ignitetest.utils.ignite_test import JFR_ENABLED
from ignitetest.utils.version import DEV_BRANCH
@@ -92,6 +93,12 @@ class IgniteSpec(metaclass=ABCMeta):
"-Dlog4j.configuration=file:" + self.service.log_config_file,
"-Dlog4j.configDebug=true"])
+ if service.context.globals.get(JFR_ENABLED, False):
+ self._add_jvm_opts(["-XX:+UnlockCommercialFeatures",
+ "-XX:+FlightRecorder",
+ "-XX:StartFlightRecording=dumponexit=true," +
+ f"filename={self.service.jfr_dir}/recording.jfr"])
+
def config_templates(self):
"""
:return: config that service will use to start on a node
@@ -223,7 +230,7 @@ class IgniteNodeSpec(IgniteSpec):
self.service.script("ignite.sh"),
self._jvm_opts(),
self.config_file_path(),
- node.log_file)
+ os.path.join(self.service.log_dir, "console.log"))
return cmd
@@ -253,7 +260,7 @@ class IgniteApplicationSpec(IgniteSpec):
self.service.script("ignite.sh"),
self._jvm_opts(),
",".join(args),
- node.log_file)
+ os.path.join(self.service.log_dir, "console.log"))
return cmd
diff --git a/modules/ducktests/tests/ignitetest/services/utils/path.py b/modules/ducktests/tests/ignitetest/services/utils/path.py
index efac7a3..71bf0a5 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/path.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/path.py
@@ -51,7 +51,8 @@ class PathAware:
Init persistent directory.
:param node: Service node.
"""
- node.account.mkdirs(f"{self.persistent_root} {self.temp_dir} {self.work_dir} {self.log_dir} {self.config_dir}")
+ node.account.mkdirs(
+ f"{self.persistent_root} {self.temp_dir} {self.work_dir} {self.log_dir} {self.config_dir} {self.jfr_dir}")
def init_logs_attribute(self):
"""
@@ -70,6 +71,10 @@ class PathAware:
"shared": {
"path": self.shared_root,
"collect_default": True
+ },
+ "jfr": {
+ "path": self.jfr_dir,
+ "collect_default": True
}
})
@@ -102,6 +107,13 @@ class PathAware:
return os.path.join(self.persistent_root, "config")
@property
+ def jfr_dir(self):
+ """
+ :return: path to jfr directory
+ """
+ return os.path.join(self.persistent_root, "jfr")
+
+ @property
def log_dir(self):
"""
:return: path to log directory
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
index f8993d0..2cd7370 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
@@ -31,10 +31,10 @@
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
- <property name="workDirectory" value="{{ work_dir }}" />
+ <property name="workDirectory" value="{{ service.work_dir }}" />
<property name="gridLogger">
<bean class="org.apache.ignite.logger.log4j.Log4JLogger">
- <constructor-arg type="java.lang.String" value="{{ config_dir }}/ignite-log4j.xml"/>
+ <constructor-arg type="java.lang.String" value="{{ service.config_dir }}/ignite-log4j.xml"/>
</bean>
</property>
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j.xml.j2
index 57a00c0..06e8e12 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j.xml.j2
@@ -20,9 +20,9 @@
-->
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
- <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
- <param name="Target" value="System.err"/>
-
+ <appender name="FILE" class="org.apache.log4j.FileAppender">
+ <param name="File" value="{{ service.log_dir }}/ignite.log"/>
+ <param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%d{{ISO8601}}][%-5p][%t][%c{{1}}] %m%n"/>
</layout>
@@ -58,6 +58,6 @@
<root>
<level value="INFO"/>
- <appender-ref ref="CONSOLE_ERR"/>
+ <appender-ref ref="FILE"/>
</root>
</log4j:configuration>
diff --git a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
index ae6850e..801e270 100644
--- a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
@@ -54,6 +54,16 @@ class DiscoreryType(IntEnum):
TCP = 1
+@constructible
+class TxPrepType(IntEnum):
+ """
+ Transaction preparation type.
+ """
+ CELL_COLOCATED = 0
+ CELL_NONCOLOCATED = 1
+ MULTIKEY = 2
+
+
# pylint: disable=W0223
class CellularAffinity(IgniteTest):
"""
@@ -135,11 +145,13 @@ class CellularAffinity(IgniteTest):
# pylint: disable=R0912
# pylint: disable=R0914
# pylint: disable=no-member
+ # pylint: disable=too-many-statements
@cluster(num_nodes=2 * (NODES_PER_CELL + 1) + 3) # cell_cnt * (srv_per_cell + cell_streamer) + zookeper_cluster
@ignite_versions(str(DEV_BRANCH), str(LATEST))
@matrix(stop_type=[StopType.DROP_NETWORK, StopType.SIGKILL, StopType.SIGTERM],
- discovery_type=[DiscoreryType.ZooKeeper, DiscoreryType.TCP])
- def test_latency(self, ignite_version, stop_type, discovery_type):
+ discovery_type=[DiscoreryType.ZooKeeper, DiscoreryType.TCP],
+ prep_type=[TxPrepType.CELL_COLOCATED])
+ def test_latency(self, ignite_version, stop_type, discovery_type, prep_type):
"""
Tests Cellular switch tx latency.
"""
@@ -169,7 +181,8 @@ class CellularAffinity(IgniteTest):
discovery_spi = from_zookeeper_cluster(zk_quorum)
- cell0, prepared_tx_loader1 = self.start_cell_with_prepared_txs(ignite_version, "C0", discovery_spi, modules)
+ cell0, prepared_tx_loader1 = \
+ self.start_cell_with_prepared_txs(ignite_version, f'C{0}', discovery_spi, modules)
if d_type is DiscoreryType.TCP:
discovery_spi = from_ignite_cluster(cell0)
@@ -179,14 +192,30 @@ class CellularAffinity(IgniteTest):
loaders = [prepared_tx_loader1]
nodes = [cell0]
- for cell in range(1, cells_amount):
+ failed_cell_id = 1
+
+ for cell_id in range(1, cells_amount):
+ # per cell
+ coll_cnt = self.PREPARED_TX_CNT if prep_type == TxPrepType.CELL_COLOCATED else 0
+
+ # should not affect switch speed dramatically, cause recovery but not waiting
+ # avoiding C0 (as not affected) & C1
+ noncoll_cnt = self.PREPARED_TX_CNT * (cells_amount - 2) \
+ if cell_id == failed_cell_id and prep_type == TxPrepType.CELL_NONCOLOCATED else 0
+
+ # cause waiting for txs with failed primary (~ 3/(cells-1) of prepared tx amount)
+ # avoiding C0 (as not affected)
+ multi_cnt = self.PREPARED_TX_CNT * (cells_amount - 1) \
+ if cell_id == failed_cell_id and prep_type == TxPrepType.MULTIKEY else 0
+
node, prepared_tx_loader = \
- self.start_cell_with_prepared_txs(ignite_version, "C%d" % cell, discovery_spi, modules)
+ self.start_cell_with_prepared_txs(
+ ignite_version, f'C{cell_id}', discovery_spi, modules, coll_cnt, noncoll_cnt, multi_cnt)
loaders.append(prepared_tx_loader)
nodes.append(node)
- failed_loader = loaders[1]
+ failed_loader = loaders[failed_cell_id]
for node in [*nodes, *loaders]:
node.await_started()
@@ -259,7 +288,9 @@ class CellularAffinity(IgniteTest):
"warmup": 10000},
modules=modules, startup_timeout_sec=180)
- def start_cell_with_prepared_txs(self, version, cell_id, discovery_spi, modules):
+ # pylint: disable=too-many-arguments
+ def start_cell_with_prepared_txs(
+ self, version, cell_id, discovery_spi, modules, col_cnt=0, noncol_cnt=0, multi_cnt=0):
"""
Starts cell with prepared transactions.
"""
@@ -276,14 +307,16 @@ class CellularAffinity(IgniteTest):
params={"cacheName": CellularAffinity.CACHE_NAME,
"attr": CellularAffinity.ATTRIBUTE,
"cell": cell_id,
- "txCnt": CellularAffinity.PREPARED_TX_CNT},
+ "colocatedTxCnt": col_cnt,
+ "multiTxCnt": multi_cnt,
+ "noncolocatedTxCnt": noncol_cnt},
jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id], modules=modules, startup_timeout_sec=180)
prepared_tx_streamer.start_async() # starts last server node and creates prepared txs on it.
return nodes, prepared_tx_streamer
- # pylint: disable=R0913
+ # pylint: disable=too-many-arguments
def start_cell(self, version, jvm_opts, discovery_spi=None, modules=None, nodes_cnt=NODES_PER_CELL):
"""
Starts cell.
diff --git a/modules/ducktests/tests/ignitetest/tests/self_test.py b/modules/ducktests/tests/ignitetest/tests/self_test.py
index c0fed29..9887e70 100644
--- a/modules/ducktests/tests/ignitetest/tests/self_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/self_test.py
@@ -111,7 +111,7 @@ class SelfTest(IgniteTest):
def get_logs_count(service):
node = service.nodes[0]
- return list(node.account.ssh_capture(f'ls {service.log_dir}/console.log* | wc -l', callback=int))[0]
+ return list(node.account.ssh_capture(f'ls {service.log_dir}/ignite.log* | wc -l', callback=int))[0]
ignites = IgniteService(self.test_context, IgniteConfiguration(version=IgniteVersion(ignite_version)),
num_nodes=1)
@@ -122,16 +122,16 @@ class SelfTest(IgniteTest):
for i in range(num_restarts - 1):
ignites.stop()
- old_cnt = get_log_lines_count(ignites, "console.log")
+ old_cnt = get_log_lines_count(ignites, "ignite.log")
assert old_cnt > 0
ignites.start(clean=False)
- new_cnt = get_log_lines_count(ignites, "console.log")
+ new_cnt = get_log_lines_count(ignites, "ignite.log")
assert new_cnt > 0
# check that there is no new entry in rotated file
- assert old_cnt == get_log_lines_count(ignites, f"console.log.{i + 1}")
+ assert old_cnt == get_log_lines_count(ignites, f"ignite.log.{i + 1}")
assert get_logs_count(ignites) == num_restarts
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index 6530eab..a9e82ac 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -24,6 +24,9 @@ from ducktape.tests.test import Test
# pylint: disable=W0223
from ignitetest.services.utils.ducktests_service import DucktestsService
+# globals:
+JFR_ENABLED = "jfr_enabled"
+
class IgniteTest(Test):
"""
@@ -45,20 +48,22 @@ class IgniteTest(Test):
return monotonic()
def tearDown(self):
- self.logger.debug("Killing all runned services to speed-up the tearing down.")
+ # jfr requires graceful shutdown to save the recording.
+ if not self.test_context.globals.get(JFR_ENABLED, False):
+ self.logger.debug("Killing all runned services to speed-up the tearing down.")
- # pylint: disable=W0212
- for service in self.test_context.services._services.values():
- assert isinstance(service, DucktestsService)
+ # pylint: disable=W0212
+ for service in self.test_context.services._services.values():
+ assert isinstance(service, DucktestsService)
- try:
- service.kill()
- except RemoteCommandError:
- pass # Process may be already self-killed on segmentation.
+ try:
+ service.kill()
+ except RemoteCommandError:
+ pass # Process may be already self-killed on segmentation.
- assert service.stopped
+ assert service.stopped
- self.logger.debug("All runned services killed.")
+ self.logger.debug("All runned services killed.")
super().tearDown()