You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/09/15 14:26:39 UTC
[ignite] branch ignite-ducktape updated: IGNITE-13429 Integration
test of control.sh transactions' management (#8239)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new af0f0a7 IGNITE-13429 Integration test of control.sh transactions' management (#8239)
af0f0a7 is described below
commit af0f0a7b13c3e9b78900c818bc9ca4040d927953
Author: Ivan Daschinskiy <iv...@gmail.com>
AuthorDate: Tue Sep 15 17:26:18 2020 +0300
IGNITE-13429 Integration test of control.sh transactions' management (#8239)
---
.../LongRunningTransactionsGenerator.java | 157 +++++++++++++++
.../ducktest/utils/IgniteAwareApplication.java | 6 +-
.../tests/ignitetest/services/ignite_app.py | 15 +-
.../ignitetest/services/utils/control_utility.py | 221 +++++++++++++++++++--
.../fast_suite.yml => control_utility/__init__.py} | 17 +-
.../baseline_test.py} | 12 +-
.../ignitetest/tests/control_utility/tx_test.py | 192 ++++++++++++++++++
.../tests/ignitetest/tests/suites/fast_suite.yml | 2 +-
modules/ducktests/tests/setup.py | 2 +-
modules/ducktests/tests/tox.ini | 1 +
10 files changed, 582 insertions(+), 43 deletions(-)
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java
new file mode 100644
index 0000000..3bbf732c
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.control_utility;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import javax.cache.CacheException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+/**
+ * Run long running transactions on node with specified param.
+ */
+public class LongRunningTransactionsGenerator extends IgniteAwareApplication {
+ /** */
+ private static final Duration TOPOLOGY_WAIT_TIMEOUT = Duration.ofSeconds(60);
+
+ /** */
+ private static final String KEYS_LOCKED_MESSAGE = "APPLICATION_KEYS_LOCKED";
+
+ /** */
+ private static final String LOCKED_KEY_PREFIX = "KEY_";
+
+ /** */
+ private volatile Executor pool;
+
+ /** {@inheritDoc} */
+ @Override protected void run(JsonNode jsonNode) throws Exception {
+ IgniteCache<String, String> cache = ignite.cache(jsonNode.get("cache_name").asText());
+
+ int txCount = jsonNode.get("tx_count") != null ? jsonNode.get("tx_count").asInt() : 1;
+
+ int txSize = jsonNode.get("tx_size") != null ? jsonNode.get("tx_size").asInt() : 1;
+
+ String keyPrefix = jsonNode.get("key_prefix") != null ? jsonNode.get("key_prefix").asText() : LOCKED_KEY_PREFIX;
+
+ String label = jsonNode.get("label") != null ? jsonNode.get("label").asText() : null;
+
+ long expectedTopologyVersion = jsonNode.get("wait_for_topology_version") != null ?
+ jsonNode.get("wait_for_topology_version").asLong() : -1L;
+
+ CountDownLatch lockLatch = new CountDownLatch(txCount);
+
+ pool = Executors.newFixedThreadPool(2 * txCount);
+
+ markInitialized();
+
+ if (expectedTopologyVersion > 0) {
+ log.info("Start waiting for topology version: " + expectedTopologyVersion + ", " +
+ "current version is: " + ignite.cluster().topologyVersion());
+
+ long start = System.nanoTime();
+
+ while (ignite.cluster().topologyVersion() < expectedTopologyVersion
+ && Duration.ofNanos(start - System.nanoTime()).compareTo(TOPOLOGY_WAIT_TIMEOUT) < 0)
+ Thread.sleep(100L);
+
+ log.info("Finished waiting for topology version: " + expectedTopologyVersion + ", " +
+ "current version is: " + ignite.cluster().topologyVersion());
+ }
+
+ for (int i = 0; i < txCount; i++) {
+ String key = keyPrefix + i;
+
+ pool.execute(() -> {
+ Lock lock = cache.lock(key);
+
+ lock.lock();
+
+ try {
+ lockLatch.countDown();
+
+ while (!terminated())
+ Thread.sleep(100L);
+ }
+ catch (InterruptedException e) {
+ markBroken(new RuntimeException("Unexpected thread interruption", e));
+
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ lock.unlock();
+ }
+ });
+ }
+
+ lockLatch.await();
+
+ log.info(KEYS_LOCKED_MESSAGE);
+
+ CountDownLatch txLatch = new CountDownLatch(txCount);
+
+ for (int i = 0; i < txCount; i++) {
+ Map<String, String> data = new TreeMap<>();
+
+ for (int j = 0; j < txSize; j++) {
+ String key = keyPrefix + (j == 0 ? String.valueOf(i) : i + "_" + j);
+
+ data.put(key, key);
+ }
+
+ IgniteTransactions igniteTransactions = label != null ? ignite.transactions().withLabel(label) :
+ ignite.transactions();
+
+ pool.execute(() -> {
+ IgniteUuid xid = null;
+
+ try (Transaction tx = igniteTransactions.txStart()) {
+ xid = tx.xid();
+
+ cache.putAll(data);
+
+ tx.commit();
+ }
+ catch (Exception e) {
+ if (e instanceof CacheException && e.getCause() != null &&
+ e.getCause() instanceof TransactionRollbackException)
+ recordResult("TX_ID", xid != null ? xid.toString() : "");
+ else
+ markBroken(new RuntimeException("Transaction is rolled back with unexpected error", e));
+ }
+ finally {
+ txLatch.countDown();
+ }
+ });
+ }
+
+ txLatch.await();
+
+ markFinished();
+ }
+}
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
index a74b53f..35760b3 100644
--- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
@@ -144,18 +144,18 @@ public abstract class IgniteAwareApplication {
/**
*
*/
- protected void markBroken(Throwable th) {
+ public void markBroken(Throwable th) {
log.info("Marking as broken.");
synchronized (stateMux) {
+ recordResult("ERROR", th.toString());
+
if (broken) {
log.info("Already marked as broken.");
return;
}
- recordResult("ERROR", th.toString());
-
assert !finished;
log.error(APP_BROKEN);
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index 9e396f2..74b7fba 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -107,12 +107,23 @@ class IgniteApplicationService(IgniteAwareService):
:param name: Result parameter's name.
:return: Extracted result of application run.
"""
- res = ""
+ results = self.extract_results(name)
+
+ assert len(results) <= 1, f"Expected exactly one result occurence, {len(results)} found."
+
+ return results[0] if results else ""
+
+ def extract_results(self, name):
+ """
+ :param name: Results parameter's name.
+ :return: Extracted results of application run.
+ """
+ res = []
output = self.nodes[0].account.ssh_capture(
"grep '%s' %s" % (name + "->", self.STDOUT_STDERR_CAPTURE), allow_fail=False)
for line in output:
- res = re.search("%s(.*)%s" % (name + "->", "<-"), line).group(1)
+ res.append(re.search("%s(.*)%s" % (name + "->", "<-"), line).group(1))
return res
diff --git a/modules/ducktests/tests/ignitetest/services/utils/control_utility.py b/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
index 4b194e6..5e65bca 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
@@ -16,9 +16,11 @@
"""
This module contains control utility wrapper.
"""
+
import random
import re
-from collections import namedtuple
+import time
+from typing import NamedTuple
from ducktape.cluster.remoteaccount import RemoteCommandError
@@ -52,10 +54,10 @@ class ControlUtility:
:param baseline: Baseline nodes or topology version to set as baseline.
"""
if isinstance(baseline, int):
- result = self.__run("--baseline version %d --yes" % baseline)
+ result = self.__run(f"--baseline version {baseline} --yes")
else:
- result = self.__run("--baseline set %s --yes" %
- ",".join([node.account.externally_routable_ip for node in baseline]))
+ result = self.__run(
+ f"--baseline set {','.join([node.account.externally_routable_ip for node in baseline])} --yes")
return self.__parse_cluster_state(result)
@@ -63,8 +65,8 @@ class ControlUtility:
"""
:param nodes: Nodes that should be added to baseline.
"""
- result = self.__run("--baseline add %s --yes" %
- ",".join([node.account.externally_routable_ip for node in nodes]))
+ result = self.__run(
+ f"--baseline add {','.join([node.account.externally_routable_ip for node in nodes])} --yes")
return self.__parse_cluster_state(result)
@@ -72,8 +74,8 @@ class ControlUtility:
"""
:param nodes: Nodes that should be removed to baseline.
"""
- result = self.__run("--baseline remove %s --yes" %
- ",".join([node.account.externally_routable_ip for node in nodes]))
+ result = self.__run(
+ f"--baseline remove {','.join([node.account.externally_routable_ip for node in nodes])} --yes")
return self.__parse_cluster_state(result)
@@ -88,8 +90,8 @@ class ControlUtility:
Enable baseline auto adjust.
:param timeout: Auto adjust timeout in millis.
"""
- timeout_str = "timeout %d" % timeout if timeout else ""
- return self.__run("--baseline auto_adjust enable %s --yes" % timeout_str)
+ timeout_str = f"timeout {timeout}" if timeout else ""
+ return self.__run(f"--baseline auto_adjust enable {timeout_str} --yes")
def activate(self):
"""
@@ -103,6 +105,124 @@ class ControlUtility:
"""
return self.__run("--deactivate --yes")
+ def tx(self, **kwargs):
+ """
+ Get list of transactions, various filters can be applied.
+ """
+ output = self.__run(self.__tx_command(**kwargs))
+ res = self.__parse_tx_list(output)
+ return res if res else output
+
+ def tx_info(self, xid):
+ """
+ Get verbose transaction info by xid.
+ """
+ return self.__parse_tx_info(self.__run(f"--tx --info {xid}"))
+
+ def tx_kill(self, **kwargs):
+ """
+ Kill transaction by xid or by various filter.
+ """
+ output = self.__run(self.__tx_command(kill=True, **kwargs))
+ res = self.__parse_tx_list(output)
+ return res if res else output
+
+ @staticmethod
+ def __tx_command(**kwargs):
+ tokens = ["--tx"]
+
+ if 'xid' in kwargs:
+ tokens.append(f"--xid {kwargs['xid']}")
+
+ if kwargs.get('clients'):
+ tokens.append("--clients")
+
+ if kwargs.get('servers'):
+ tokens.append("--servers")
+
+ if 'min_duration' in kwargs:
+ tokens.append(f"--min-duration {kwargs.get('min_duration')}")
+
+ if 'min_size' in kwargs:
+ tokens.append(f"--min-size {kwargs.get('min_size')}")
+
+ if 'label_pattern' in kwargs:
+ tokens.append(f"--label {kwargs['label_pattern']}")
+
+ if kwargs.get("nodes"):
+ tokens.append(f"--nodes {','.join(kwargs.get('nodes'))}")
+
+ if 'limit' in kwargs:
+ tokens.append(f"--limit {kwargs['limit']}")
+
+ if 'order' in kwargs:
+ tokens.append(f"--order {kwargs['order']}")
+
+ if kwargs.get('kill'):
+ tokens.append("--kill --yes")
+
+ return " ".join(tokens)
+
+ @staticmethod
+ def __parse_tx_info(output):
+ tx_info_pattern = re.compile(
+ "Near XID version: (?P<xid_full>GridCacheVersion \\[topVer=\\d+, order=\\d+, nodeOrder=\\d+\\])\\n\\s+"
+ "Near XID version \\(UUID\\): (?P<xid>[^\\s]+)\\n\\s+"
+ "Isolation: (?P<isolation>[^\\s]+)\\n\\s+"
+ "Concurrency: (?P<concurrency>[^\\s]+)\\n\\s+"
+ "Timeout: (?P<timeout>\\d+)\\n\\s+"
+ "Initiator node: (?P<initiator_id>[^\\s]+)\\n\\s+"
+ "Initiator node \\(consistent ID\\): (?P<initiator_consistent_id>[^\\s+]+)\\n\\s+"
+ "Label: (?P<label>[^\\s]+)\\n\\s+Topology version: AffinityTopologyVersion "
+ "\\[topVer=(?P<top_ver>\\d+), minorTopVer=(?P<minor_top_ver>\\d+)\\]\\n\\s+"
+ "Used caches \\(ID to name\\): {(?P<caches>.*)}\\n\\s+"
+ "Used cache groups \\(ID to name\\): {(?P<cache_groups>.*)}\\n\\s+"
+ "States across the cluster: \\[(?P<states>.*)\\]"
+ )
+
+ match = tx_info_pattern.search(output)
+
+ str_fields = ['xid', 'xid_full', 'label', 'timeout', 'isolation', 'concurrency', 'initiator_id',
+ 'initiator_consistent_id']
+ dict_fields = ['caches', 'cache_groups']
+
+ if match:
+ kwargs = {v: match.group(v) for v in str_fields}
+ kwargs['timeout'] = int(match.group('timeout'))
+ kwargs.update({v: parse_dict(match.group(v)) for v in dict_fields})
+ kwargs['top_ver'] = (int(match.group('top_ver')), int(match.group('minor_top_ver')))
+ kwargs['states'] = parse_list(match.group('states'))
+
+ return TxVerboseInfo(**kwargs)
+
+ return None
+
+ @staticmethod
+ def __parse_tx_list(output):
+ tx_pattern = re.compile(
+ "Tx: \\[xid=(?P<xid>[^\\s]+), "
+ "label=(?P<label>[^\\s]+), state=(?P<state>[^\\s]+), "
+ "startTime=(?P<start_time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}), duration=(?P<duration>\\d+), "
+ "isolation=(?P<isolation>[^\\s]+), concurrency=(?P<concurrency>[^\\s]+), "
+ "topVer=AffinityTopologyVersion \\[topVer=(?P<top_ver>\\d+), minorTopVer=(?P<minor_top_ver>\\d+)\\], "
+ "timeout=(?P<timeout>\\d+), size=(?P<size>\\d+), dhtNodes=\\[(?P<dht_nodes>.*)\\], "
+ "nearXid=(?P<near_xid>[^\\s]+), parentNodeIds=\\[(?P<parent_nodes>.*)\\]\\]")
+
+ str_fields = ['xid', 'label', 'state', 'isolation', 'concurrency', 'near_xid']
+ int_fields = ['timeout', 'size', 'duration']
+ list_fields = ['parent_nodes', 'dht_nodes']
+
+ tx_list = []
+ for match in tx_pattern.finditer(output):
+ kwargs = {v: match.group(v) for v in str_fields}
+ kwargs.update({v: int(match.group(v)) for v in int_fields})
+ kwargs['top_ver'] = (int(match.group('top_ver')), int(match.group('minor_top_ver')))
+ kwargs.update({v: parse_list(match.group(v)) for v in list_fields})
+ kwargs['start_time'] = time.strptime(match.group('start_time'), "%Y-%m-%d %H:%M:%S.%f")
+ tx_list.append(TxInfo(**kwargs))
+
+ return tx_list
+
@staticmethod
def __parse_cluster_state(output):
state_pattern = re.compile("Cluster state: (?P<cluster_state>[^\\s]+)")
@@ -127,12 +247,12 @@ class ControlUtility:
def __run(self, cmd):
node = random.choice(self.__alives())
- self.logger.debug("Run command %s on node %s", cmd, node.name)
+ self.logger.debug(f"Run command {cmd} on node {node.name}")
raw_output = node.account.ssh_capture(self.__form_cmd(node, cmd), allow_fail=True)
code, output = self.__parse_output(raw_output)
- self.logger.debug("Output of command %s on node %s, exited with code %d, is %s", cmd, node.name, code, output)
+ self.logger.debug(f"Output of command {cmd} on node {node.name}, exited with code {code}, is {output}")
if code != 0:
raise ControlUtilityError(node.account, cmd, code, output)
@@ -140,8 +260,7 @@ class ControlUtility:
return output
def __form_cmd(self, node, cmd):
- return self._cluster.spec.path.script("%s --host %s %s" %
- (self.BASE_COMMAND, node.account.externally_routable_ip, cmd))
+ return self._cluster.spec.path.script(f"{self.BASE_COMMAND} --host {node.account.externally_routable_ip} {cmd}")
@staticmethod
def __parse_output(raw_output):
@@ -159,8 +278,59 @@ class ControlUtility:
return [node for node in self._cluster.nodes if self._cluster.alive(node)]
-BaselineNode = namedtuple("BaselineNode", ["consistent_id", "state", "order"])
-ClusterState = namedtuple("ClusterState", ["state", "topology_version", "baseline"])
+class BaselineNode(NamedTuple):
+ """
+ Baseline node info.
+ """
+ consistent_id: str
+ state: str
+ order: int
+
+
+class ClusterState(NamedTuple):
+ """
+ Cluster state info.
+ """
+ state: str
+ topology_version: int
+ baseline: list
+
+
+class TxInfo(NamedTuple):
+ """
+ Transaction info.
+ """
+ xid: str
+ near_xid: str
+ label: str
+ state: str
+ start_time: time.struct_time
+ duration: int
+ isolation: str
+ concurrency: str
+ top_ver: tuple
+ timeout: int
+ size: int
+ dht_nodes: list = []
+ parent_nodes: list = []
+
+
+class TxVerboseInfo(NamedTuple):
+ """
+ Transaction info returned with --info
+ """
+ xid: str
+ xid_full: str
+ label: str
+ isolation: str
+ concurrency: str
+ timeout: int
+ top_ver: tuple
+ initiator_id: str
+ initiator_consistent_id: str
+ caches: dict
+ cache_groups: dict
+ states: list
class ControlUtilityError(RemoteCommandError):
@@ -169,3 +339,22 @@ class ControlUtilityError(RemoteCommandError):
"""
def __init__(self, account, cmd, exit_status, output):
super().__init__(account, cmd, exit_status, "".join(output))
+
+
+def parse_dict(raw):
+ """
+ Parse java Map.toString() to python dict.
+ """
+ res = {}
+ for token in raw.split(','):
+ key, value = tuple(token.strip().split('='))
+ res[key] = value
+
+ return res
+
+
+def parse_list(raw):
+ """
+ Parse java List.toString() to python list
+ """
+ return [token.strip() for token in raw.split(',')]
diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml b/modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py
similarity index 77%
copy from modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
copy to modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py
index 4811574..1540a34 100644
--- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py
@@ -13,17 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-smoke:
- - ../smoke_test.py
-
-control_utility:
- - ../control_utility_test.py
-
-pme_free_switch:
- - ../pme_free_switch_test.py
-
-cellular_affinity:
- - ../cellular_affinity_test.py
-
-rebalance:
- - ../add_node_rebalance_test.py
+"""
+This package contains control.sh utility tests.
+"""
diff --git a/modules/ducktests/tests/ignitetest/tests/control_utility_test.py b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
similarity index 95%
rename from modules/ducktests/tests/ignitetest/tests/control_utility_test.py
rename to modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
index a30a381..a1de09f 100644
--- a/modules/ducktests/tests/ignitetest/tests/control_utility_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
@@ -14,7 +14,7 @@
# limitations under the License.
"""
-This module contains control.sh utility tests.
+This module contains manipulating baseline test through control utility.
"""
from ducktape.mark.resource import cluster
@@ -27,7 +27,7 @@ from ignitetest.services.utils.ignite_configuration.data_storage import DataRegi
from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
from ignitetest.utils import version_if, ignite_versions
from ignitetest.utils.ignite_test import IgniteTest
-from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion, LATEST_2_7, V_2_8_0
+from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion, V_2_8_0
# pylint: disable=W0223
@@ -38,7 +38,7 @@ class BaselineTests(IgniteTest):
NUM_NODES = 3
@cluster(num_nodes=NUM_NODES)
- @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7))
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
def test_baseline_set(self, ignite_version):
"""
Test baseline set.
@@ -74,7 +74,7 @@ class BaselineTests(IgniteTest):
self.__check_nodes_in_baseline(new_node.nodes, baseline)
@cluster(num_nodes=NUM_NODES)
- @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7))
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
def test_baseline_add_remove(self, ignite_version):
"""
Test add and remove nodes from baseline.
@@ -116,7 +116,7 @@ class BaselineTests(IgniteTest):
self.__check_nodes_not_in_baseline(new_node.nodes, baseline)
@cluster(num_nodes=NUM_NODES)
- @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7))
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
def test_activate_deactivate(self, ignite_version):
"""
Test activate and deactivate cluster.
@@ -139,7 +139,7 @@ class BaselineTests(IgniteTest):
@cluster(num_nodes=NUM_NODES)
@version_if(lambda version: version >= V_2_8_0)
- @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7))
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
def test_baseline_autoadjust(self, ignite_version):
"""
Test activate and deactivate cluster.
diff --git a/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
new file mode 100644
index 0000000..5aca06e
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
@@ -0,0 +1,192 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains transactions manipulation test through control utility.
+"""
+import random
+
+from ducktape.mark.resource import cluster
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
+from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration
+from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster
+from ignitetest.utils import ignite_versions
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion
+
+
+# pylint: disable=W0223
+class TransactionsTests(IgniteTest):
+ """
+ Tests control.sh transaction management command.
+ """
+ NUM_NODES = 4
+ CACHE_NAME = "TEST"
+
+ @cluster(num_nodes=NUM_NODES)
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+ def test_tx_info(self, ignite_version):
+ """
+ Tests verbose tx info for specific xid.
+ """
+ servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2)
+
+ long_tx = self.__start_tx_app(ignite_version, servers, cache_name=self.CACHE_NAME, tx_count=2, tx_size=2,
+ key_prefix='TX_1_KEY_')
+
+ wait_for_key_locked(long_tx)
+
+ control_utility = ControlUtility(servers, self.test_context)
+
+ transactions = control_utility.tx()
+
+ pick_tx = random.choice(transactions)
+
+ res = control_utility.tx_info(pick_tx.xid)
+
+ assert res.xid == pick_tx.xid
+ assert res.timeout == pick_tx.timeout
+ assert res.top_ver == pick_tx.top_ver
+ assert res.label == pick_tx.label
+
+ @cluster(num_nodes=NUM_NODES)
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+ def test_kill_tx(self, ignite_version):
+ """
+ Test kill transactions by xid and filter.
+ """
+ servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2)
+
+ tx_count = 3
+
+ long_tx_1 = self.__start_tx_app(ignite_version, servers, cache_name=self.CACHE_NAME, tx_count=tx_count,
+ tx_size=2, key_prefix='TX_1_KEY_', label='TX_1', wait_for_topology_version=4)
+
+ long_tx_2 = self.__start_tx_app(ignite_version, servers, cache_name=self.CACHE_NAME, tx_count=tx_count,
+ tx_size=2, key_prefix='TX_2_KEY_', label='TX_2', wait_for_topology_version=4)
+
+ wait_for_key_locked(long_tx_1, long_tx_2)
+
+ control_utility = ControlUtility(servers, self.test_context)
+
+ # check kill with specific xid.
+ transactions = control_utility.tx(label_pattern='TX_1')
+ res = control_utility.tx_kill(xid=random.choice(transactions).xid)
+ assert res and len(res) == 1 and res[0].xid == long_tx_1.extract_result("TX_ID")
+
+ # check kill with filter.
+ res = control_utility.tx_kill(label_pattern='TX_2')
+ assert res and len(res) == tx_count and set(map(lambda x: x.xid, res))\
+ .issubset(set(long_tx_2.extract_results("TX_ID")))
+
+ @cluster(num_nodes=NUM_NODES)
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+ def test_tx_filter(self, ignite_version):
+ """
+ Test filtering transactions list.
+ """
+ servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2)
+
+ client_tx_count, client_tx_size = 5, 4
+ server_tx_count, server_tx_size = 3, 2
+
+ servers = self.__start_tx_app(ignite_version, servers, client_mode=False, cache_name=self.CACHE_NAME,
+ tx_count=server_tx_count, tx_size=server_tx_size, key_prefix='TX_1_KEY_',
+ label='LBL_SERVER', wait_for_topology_version=4)
+
+ clients = self.__start_tx_app(ignite_version, servers, cache_name=self.CACHE_NAME, tx_count=client_tx_count,
+ tx_size=client_tx_size, key_prefix='TX_2_KEY_', label='LBL_CLIENT',
+ wait_for_topology_version=4)
+
+ wait_for_key_locked(clients, servers)
+ control_utility = ControlUtility(servers, self.test_context)
+
+ start_check = self.monotonic()
+ assert len(control_utility.tx(clients=True, label_pattern='LBL_.*')) == client_tx_count
+ assert len(control_utility.tx(servers=True, label_pattern='LBL_.*')) == server_tx_count
+
+ # limit to 2 transactions on each node, therefore 4 total.
+ assert len(control_utility.tx(limit=2, label_pattern='LBL_.*')) == 4
+
+ assert len(control_utility.tx(label_pattern='LBL_.*')) == client_tx_count + server_tx_count
+
+ # filter transactions with keys size greater or equal to min_size.
+ assert len(control_utility.tx(min_size=client_tx_size, label_pattern='LBL_.*')) == client_tx_count
+
+ server_nodes = [node.consistent_id for node in servers.nodes]
+ assert len(control_utility.tx(label_pattern='LBL_.*', nodes=server_nodes)) == server_tx_count
+
+ # test ordering.
+ for order_type in ['DURATION', 'SIZE', 'START_TIME']:
+ transactions = control_utility.tx(label_pattern='LBL_.*', order=order_type)
+ assert is_sorted(transactions, key=lambda x, attr=order_type: getattr(x, attr.lower()), reverse=True)
+
+ # test min_duration filtering.
+ min_duration = int(self.monotonic() - start_check)
+ transactions = control_utility.tx(min_duration=min_duration, label_pattern='LBL_.*')
+ assert len(transactions) == server_tx_count + client_tx_count
+ for tx in transactions:
+ assert tx.duration >= min_duration
+
+ def __start_tx_app(self, version, servers, *, client_mode=True, **kwargs):
+ app_params = {
+ 'config': IgniteConfiguration(version=IgniteVersion(version),
+ client_mode=client_mode,
+ discovery_spi=from_ignite_cluster(servers)),
+ 'java_class_name': 'org.apache.ignite.internal.ducktest.tests.control_utility'
+ '.LongRunningTransactionsGenerator',
+ 'params': kwargs
+ }
+
+ app = IgniteApplicationService(self.test_context, **app_params)
+ app.start()
+
+ return app
+
+ def __start_ignite_nodes(self, version, num_nodes, timeout_sec=60):
+ config = IgniteConfiguration(
+ cluster_state="ACTIVE",
+ version=IgniteVersion(version),
+ caches=[CacheConfiguration(name=self.CACHE_NAME, atomicity_mode='TRANSACTIONAL')]
+ )
+
+ servers = IgniteService(self.test_context, config=config, num_nodes=num_nodes)
+
+ servers.start(timeout_sec=timeout_sec)
+
+ return servers
+
+
+def wait_for_key_locked(*clusters):
+ """
+ Wait for APPLICATION_KEYS_LOCKED on tx_app nodes.
+ """
+ for cluster_ in clusters:
+ cluster_.await_event("APPLICATION_KEYS_LOCKED", timeout_sec=60, from_the_beginning=True)
+
+
+def is_sorted(lst, key=lambda x: x, reverse=False):
+ """
+ Check if list is sorted.
+ """
+ for i, elem in enumerate(lst[1:]):
+ return key(elem) <= key(lst[i]) if not reverse else key(elem) >= key(lst[i])
+
+ return True
diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
index 4811574..698c1d8 100644
--- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
+++ b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
@@ -17,7 +17,7 @@ smoke:
- ../smoke_test.py
control_utility:
- - ../control_utility_test.py
+ - ../control_utility
pme_free_switch:
- ../pme_free_switch_test.py
diff --git a/modules/ducktests/tests/setup.py b/modules/ducktests/tests/setup.py
index 5b89d58..de849ed 100644
--- a/modules/ducktests/tests/setup.py
+++ b/modules/ducktests/tests/setup.py
@@ -30,7 +30,7 @@ setup(name="ignitetest",
license="apache2.0",
packages=find_packages(exclude=["ignitetest.tests", "ignitetest.tests.*"]),
include_package_data=True,
- install_requires=['ducktape==0.8.0'],
+ install_requires=["ducktape==0.8.0", "tox==3.15.2"],
dependency_links=[
'https://github.com/confluentinc/ducktape/tarball/master#egg=ducktape-0.8.0'
])
diff --git a/modules/ducktests/tests/tox.ini b/modules/ducktests/tests/tox.ini
index 45107be..7955108 100644
--- a/modules/ducktests/tests/tox.ini
+++ b/modules/ducktests/tests/tox.ini
@@ -33,6 +33,7 @@ commands =
[BASIC]
min-public-methods=0
+good-names=i,j,k,x,y,ex,pk,tx
[SIMILARITIES]
ignore-imports=yes