You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/12/01 09:05:32 UTC
[ignite-3] 01/01: wip. Rewrite dynamic tests to parameterized.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-18171
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d8623353ca5c6102e9f5effb7ae6791b872cba6d
Author: amashenkov <an...@gmail.com>
AuthorDate: Wed Nov 30 19:51:41 2022 +0300
wip. Rewrite dynamic tests to parameterized.
---
.../cluster/AbstractClusterStartStopTest.java | 289 ++++++++++
.../internal/cluster/ItClusterStartupTest.java | 314 ++++++++++
.../ignite/internal/cluster/ItNodeRestartTest.java | 601 ++++++--------------
.../internal/cluster/ItNodeStartStopTest.java | 630 ---------------------
4 files changed, 781 insertions(+), 1053 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/AbstractClusterStartStopTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/AbstractClusterStartStopTest.java
new file mode 100644
index 0000000000..446384836f
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/AbstractClusterStartStopTest.java
@@ -0,0 +1,289 @@
+package org.apache.ignite.internal.cluster;
+
+import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.micronaut.configuration.picocli.MicronautFactory;
+import io.micronaut.context.ApplicationContext;
+import io.micronaut.context.env.Environment;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.EnvironmentDefaultValueProvider;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.cli.commands.TopLevelCliCommand;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProperty;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.tx.Transaction;
+import org.hamcrest.text.IsEmptyString;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.extension.ExtendWith;
+import picocli.CommandLine;
+
+
+@ExtendWith(WorkDirectoryExtension.class)
+@WithSystemProperty(key = "org.jline.terminal.dumb", value = "true")
+abstract class AbstractClusterStartStopTest extends BaseIgniteAbstractTest {
+ protected static final int NODE_JOIN_WAIT_TIMEOUT = 2_000;
+
+ /** Work directory. */
+ @WorkDirectory
+ protected static Path WORK_DIR;
+
+ /** Addresses for Node filder. */
+ protected static final String connectionAddr = "\"localhost:3344\", \"localhost:3345\", \"localhost:3346\"";
+
+ /** Correct ignite cluster url. */
+ protected static final String NODE_URL = "http://localhost:10300";
+
+ /** Cluster management group node name. */
+ protected static final String CMG_NODE = "node1";
+ /** MetaStorage group node name. */
+ protected static final String METASTORAGE_NODE = "node3";
+ /** Data node 1 name. */
+ protected static final String DATA_NODE = "node2"; // Partition leader.
+ /** Data node 2 name. */
+ protected static final String DATA_NODE_2 = "node4";
+ /** New node name. */
+ protected static final String NEW_NODE = "newNode";
+
+ /** Nodes configurations. */
+ protected static final Map<String, String> nodesCfg = Map.of(
+ "node1", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3344,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}",
+ "node2", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3345,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}",
+ "node3", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3346,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}",
+ "node4", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3347,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}",
+ "newNode", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3348,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+
+ // TODO: Change Map -> Set.
+ // Map is used as node names uses for partition affinity calculation,
+ // but we can't guarantee a node with name "DATA_NODE" will own a partition.
+ protected static final Map<String, String> nodeAliasToNameMapping = Map.of(
+ "C", CMG_NODE,
+ "M", METASTORAGE_NODE,
+ "D", DATA_NODE,
+ "D2", DATA_NODE_2
+ );
+
+ /** Resolves node alias to node name. */
+ protected static String resolve(String nodeAliases) {
+ return nodeAliasToNameMapping.get(nodeAliases);
+ }
+
+ /** Cluster nodes. */
+ protected final Map<String, CompletableFuture<Ignite>> clusterNodes = new HashMap<>();
+
+ /** Starts and initialize grid. */
+ protected List<CompletableFuture<Ignite>> initGrid(Collection<String> nodes) throws Exception {
+ List<CompletableFuture<Ignite>> futures = startNodes(nodes);
+
+ // Init cluster.
+ IgnitionManager.init(CMG_NODE, List.of(METASTORAGE_NODE), List.of(CMG_NODE), "cluster");
+
+ for (CompletableFuture<Ignite> future : futures) {
+ assertThat(future, willCompleteSuccessfully());
+ }
+
+ // Create tables.
+ IgniteImpl node = (IgniteImpl) futures.get(0).join();
+ sql(node, null, "CREATE TABLE tbl1 (id INT PRIMARY KEY, val INT) WITH partitions = 1, replicas = 1");
+
+ for (CompletableFuture<Ignite> f : futures) {
+ ReplicaManager replicaMgr = (ReplicaManager) MethodHandles.privateLookupIn(IgniteImpl.class, MethodHandles.lookup())
+ .findVarHandle(IgniteImpl.class, "replicaMgr", ReplicaManager.class)
+ .get(f.get());
+
+ assertTrue(DATA_NODE.equals(f.get().name()) ^ replicaMgr.startedGroups().isEmpty());
+ }
+
+ sql(node, null, "INSERT INTO tbl1(id, val) VALUES (1,1)");
+
+ return futures;
+ }
+
+ protected List<CompletableFuture<Ignite>> startNodes(Collection<String> names) {
+ return names.stream()
+ .map(this::startNode)
+ .collect(Collectors.toList());
+ }
+
+
+ protected void stopAllNodes() {
+ List<String> names0 = List.copyOf(clusterNodes.keySet());
+
+ for (int i = names0.size() - 1; i >= 0; i--) {
+ stopNode(names0.get(i));
+ }
+ }
+
+ protected CompletableFuture<Ignite> startNode(String nodeName) {
+ String nodeConfig = nodesCfg.get(nodeName);
+
+ CompletableFuture<Ignite> fut = IgnitionManager.start(nodeName, nodeConfig, WORK_DIR.resolve(nodeName));
+
+ clusterNodes.put(nodeName, fut);
+
+ return fut;
+ }
+
+ protected void stopNode(String nodeName) {
+ CompletableFuture<Ignite> rmv = clusterNodes.remove(nodeName);
+
+ assert rmv != null;
+
+ IgnitionManager.stop(nodeName);
+ }
+
+ protected boolean isNodeStarted(String n) {
+ return clusterNodes.containsKey(n);
+ }
+
+ protected static List<List<Object>> sql(Ignite node, @Nullable Transaction tx, String sql, Object... args) {
+ var queryEngine = ((IgniteImpl) node).queryEngine();
+
+ SessionId sessionId = queryEngine.createSession(5_000, PropertiesHolder.fromMap(
+ Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
+ ));
+
+ try {
+ var context = tx != null ? QueryContext.of(tx) : QueryContext.of();
+
+ return getAllFromCursor(
+ await(queryEngine.querySingleAsync(sessionId, context, sql, args))
+ );
+ } finally {
+ queryEngine.closeSession(sessionId);
+ }
+ }
+
+ protected static boolean logicalTopologyContainsNode(String nodeId) {
+ return nodeInTopology("logical", nodeId);
+ }
+
+ protected static boolean physicalTopologyContainsNode(String nodeId) {
+ return nodeInTopology("physical", nodeId);
+ }
+
+ private static boolean nodeInTopology(String topologyType, String nodeId) {
+ StringWriter out = new StringWriter();
+ StringWriter err = new StringWriter();
+
+ new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST)))
+ .setDefaultValueProvider(new EnvironmentDefaultValueProvider())
+ .setOut(new PrintWriter(out, true))
+ .setErr(new PrintWriter(err, true))
+ .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL);
+
+ assertThat(err.toString(), IsEmptyString.emptyString());
+
+ return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find();
+ }
+
+ /**
+ * Grids configurations generator.
+ */
+ protected static class GridGenerator {
+ private final LinkedHashSet<String> currentGrid = new LinkedHashSet<>();
+ private final List<List<String>> gridStartSequences = new ArrayList<>();
+ private final BiPredicate<String, Set<String>> nodeFilter;
+ private final Predicate<Set<String>> gridFilter;
+ private Collection<String> nodeNames;
+
+ protected GridGenerator(Set<String> nodeNames, BiPredicate<String, Set<String>> nodeFilter, Predicate<Set<String>> gridFilter) {
+ this.nodeNames = nodeNames;
+ this.nodeFilter = nodeFilter;
+ this.gridFilter = gridFilter;
+ }
+
+ /** Generates tests execution sequence recursively. */
+ List<List<String>> generate() {
+ generate0(nodeNames);
+
+ return gridStartSequences;
+ }
+
+ /** Generates tests execution sequence recursively. */
+ private void generate0(Collection<String> availableNodes) {
+ if (gridFilter.test(currentGrid)) {
+ gridStartSequences.add(new ArrayList<>(currentGrid)); // Copy mutable collection.
+ }
+
+ for (String node : availableNodes) {
+ if (!nodeFilter.test(node, currentGrid)) {
+ continue; // Skip node from adding to the current grid.
+ }
+
+ currentGrid.add(node);
+
+ HashSet<String> unusedNodes = new HashSet<>(availableNodes);
+ unusedNodes.remove(node);
+
+ generate0(unusedNodes);
+
+ currentGrid.remove(node);
+ }
+ }
+
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItClusterStartupTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItClusterStartupTest.java
new file mode 100644
index 0000000000..09aff7d486
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItClusterStartupTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.tx.Transaction;
+import org.hamcrest.Matchers;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+/**
+ * Test node start/stop in different scenarios and validate grid components behavior depending on availability/absence of quorums.
+ */
+//TODO: Fix expected messages in assertThrows
+// TODO: Create 2 distribution zones, which spans a single node and both data nodes, and a tables in these zones.
+@SuppressWarnings("ThrowableNotThrown")
+public class ItClusterStartupTest extends AbstractClusterStartStopTest {
+ @BeforeEach
+ public void before() throws Exception {
+ for (String name : nodesCfg.keySet()) {
+ IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
+ }
+
+ initGrid(nodeAliasToNameMapping.values());
+
+ // Shutdown cluster.
+ stopAllNodes();
+ }
+
+ /** Runs after each test sequence. */
+ @AfterEach
+ public void afterEach() {
+ stopAllNodes();
+
+ for (String name : nodesCfg.keySet()) {
+ IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
+ }
+ }
+
+ /**
+ * Generate node start sequences.
+ *
+ * @return JUnit tests.
+ */
+ static Object[] generateParameters() {
+ return new GridGenerator(
+ nodeAliasToNameMapping.keySet(),
+ (name, grid) -> (!grid.isEmpty() || "C".equals(name)) // CMG node always starts first.
+ && (!"D2".equals(name) || grid.contains("D")), // Data nodes are interchangeable.
+ grid -> grid.size() == nodeAliasToNameMapping.size()
+ ).generate().toArray(Object[]::new);
+ }
+
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testNodeJoin(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkNodeJoin();
+ }
+ }
+
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testCreateTable(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkCreateTable();
+ }
+ }
+
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testImplicitTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkImplicitTx();
+ }
+ }
+
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testReadWriteTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkTxRW();
+ }
+ }
+
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testReadOnlyTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkTxRO();
+ }
+ }
+
+ private void checkNodeJoin() {
+ try {
+ CompletableFuture<Ignite> fut = startNode(NEW_NODE);
+
+ if (!clusterNodes.containsKey(CMG_NODE)) {
+ assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
+
+ assertTrue(physicalTopologyContainsNode(NEW_NODE));
+ // CMG, which holds logical topology state, is unavailable.
+ assertThrowsWithCause(() -> logicalTopologyContainsNode(NEW_NODE), IgniteException.class);
+
+ return;
+ } else if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
+ // Node future can't complete as some components requires Metastorage on start.
+ assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
+
+ assertTrue(physicalTopologyContainsNode(NEW_NODE));
+ assertFalse(logicalTopologyContainsNode(NEW_NODE)); //TODO: Is Metastore required to promote node to logical topology?
+
+ return;
+ }
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ assertTrue(physicalTopologyContainsNode(((IgniteImpl) fut.join()).id()));
+ assertTrue(logicalTopologyContainsNode(((IgniteImpl) fut.join()).id()));
+ } finally {
+ IgnitionManager.stop(NEW_NODE);
+ }
+ }
+
+ private void checkCreateTable() {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ String createTableCommand = "CREATE TABLE tempTbl (id INT PRIMARY KEY, val INT) WITH partitions = 1";
+ String dropTableCommand = "DROP TABLE IF EXISTS tempTbl";
+
+ try {
+ sql(node, null, createTableCommand);
+ } finally {
+ sql(node, null, dropTableCommand);
+ }
+ }
+
+ private void checkTxRO() {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ Transaction roTx = node.transactions().readOnly().begin();
+
+ try {
+ if (!clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) {
+ assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), IgniteException.class);
+
+ return;
+ } else if (clusterNodes.containsKey(DATA_NODE_2) && clusterNodes.size() <= 2 /* no quorum */) {
+ // Fake transaction with a timestamp from the past.
+ Transaction tx0 = Mockito.spy(roTx);
+ Mockito.when(tx0.readTimestamp()).thenReturn(new HybridTimestamp(1L, 0));
+ sql(node, roTx, "SELECT * FROM tbl1");
+
+ // Transaction with recent timestamp.
+ assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), IgniteException.class);
+
+ return;
+ }
+
+ sql(node, roTx, "SELECT * FROM tbl1");
+ } finally {
+ roTx.rollback();
+ }
+ }
+
+ public void checkImplicitTx() {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ // TODO: Bound table distribution zone to data nodes and uncomment.
+ // if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
+ if (clusterNodes.size() <= 2 || !clusterNodes.containsKey(DATA_NODE)) {
+ assertThrowsWithCause(
+ () -> sql(node, null, "INSERT INTO tbl1 VALUES (2, -2)"),
+ IgniteException.class,
+ "Failed to get the primary replica");
+
+ return;
+ }
+
+ sql(node, null, "INSERT INTO tbl1 VALUES (2, 2)");
+
+ try {
+ assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2));
+ } finally {
+ sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
+ }
+ }
+
+ private void checkTxRW() {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ // TODO: Bound table distribution zone to data nodes and uncomment.
+ // if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
+ if (clusterNodes.size() <= 2 || !clusterNodes.containsKey(DATA_NODE)) {
+ Transaction tx = node.transactions().begin();
+ try {
+ assertThrowsWithCause(
+ () -> sql(node, tx, "INSERT INTO tbl1 VALUES (2, -2)"),
+ IgniteException.class,
+ "Failed to get the primary replica");
+ } finally {
+ tx.rollback();
+ }
+
+ return;
+ }
+
+ Transaction tx = node.transactions().begin();
+ try {
+ try {
+ sql(node, tx, "INSERT INTO tbl1 VALUES (2, 2)");
+
+ tx.commit();
+ } finally {
+// tx.rollback();
+ }
+
+ assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2));
+
+ } finally {
+ sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
+ }
+ }
+
+ /** Find started cluster node or return {@code null} if not found. */
+ private @Nullable Ignite initializedNode() {
+ assert !clusterNodes.isEmpty();
+
+ CompletableFuture<Ignite> nodeFut = clusterNodes.values().iterator().next();
+
+ if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
+ assertThrowsWithCause(() -> nodeFut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
+
+ clusterNodes.forEach((k, v) -> assertNull(v.getNow(null), k));
+
+ return null;
+ }
+
+ assertThat(nodeFut, willCompleteSuccessfully());
+
+ return nodeFut.join();
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeRestartTest.java
index 86e194d014..9939a095b4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeRestartTest.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.cluster;
-import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.micronaut.configuration.picocli.MicronautFactory;
@@ -29,44 +29,22 @@ import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.lang.reflect.Method;
-import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.internal.app.EnvironmentDefaultValueProvider;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.cli.commands.TopLevelCliCommand;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.sql.engine.QueryContext;
-import org.apache.ignite.internal.sql.engine.QueryProperty;
-import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
-import org.apache.ignite.internal.sql.engine.session.SessionId;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.tx.Transaction;
@@ -76,505 +54,311 @@ import org.hamcrest.text.IsEmptyString;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DynamicContainer;
-import org.junit.jupiter.api.DynamicNode;
-import org.junit.jupiter.api.DynamicTest;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestFactory;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import picocli.CommandLine;
/**
- * Test node restart in different scenarios and validate grid components behavior depending on availability/absence of quorums.
+ * Test node start/stop in different scenarios and validate grid components behavior depending on availability/absence of quorums.
*/
// TODO: Fix expected messages in assertThrows
// TODO: Create 2 distribution zones, which spans a single node and both data nodes, and a tables in these zones.
-@ExtendWith(WorkDirectoryExtension.class)
-@WithSystemProperty(key = "org.jline.terminal.dumb", value = "true")
-public class ItNodeRestartTest extends BaseIgniteAbstractTest {
- public static final int NODE_JOIN_WAIT_TIMEOUT = 5;
- public static final Runnable NOOP = () -> {
- };
-
- /** Work directory. */
- @WorkDirectory
- private static Path WORK_DIR;
-
- private static final String connectionAddr = "\"localhost:3344\", \"localhost:3345\", \"localhost:3346\"";
-
- /** Correct ignite cluster url. */
- protected static final String NODE_URL = "http://localhost:10300";
-
- /** Cluster management group node name. */
- private static final String CMG_NODE = "node1";
- /** MetaStorage group node name. */
- private static final String METASTORAGE_NODE = "node3";
- /** Data node 1 name. */
- private static final String DATA_NODE = "node2"; // Partition leader.
- /** Data node 2 name. */
- private static final String DATA_NODE_2 = "node4";
- /** New node name. */
- private static final String NEW_NODE = "newNode";
-
- /** Nodes configurations. */
- private static final Map<String, String> nodesCfg = Map.of(
- "node1", "{\n"
- + " \"network\": {\n"
- + " \"port\":3344,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}",
- "node2", "{\n"
- + " \"network\": {\n"
- + " \"port\":3345,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}",
- "node3", "{\n"
- + " \"network\": {\n"
- + " \"port\":3346,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}",
- "node4", "{\n"
- + " \"network\": {\n"
- + " \"port\":3347,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}"
- );
-
- private static final String NEW_NODE_CONFIG = "{\n"
- + " \"network\": {\n"
- + " \"port\":3348,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}";
-
- // TODO: Drop labels and make node names meaningful, when distribution zones will be implemented.
- // Labels are used for better understanding node role.
- // Node names uses for partition affinity calculation, we can't guarantee a node with name "DATA_NODE" will own a partition.
- // So, static meaningless node names are used, which are mapped to labels.
- private static final Map<String, String> nodeLabels = Map.of(
- CMG_NODE, "C",
- METASTORAGE_NODE, "M",
- DATA_NODE, "D",
- DATA_NODE_2, "D2",
- NEW_NODE, "N"
- );
-
- /** Cluster nodes. */
- private final Map<String, Future<Ignite>> clusterNodes = new HashMap<>();
-
- /** Runs after each test sequence. */
+@SuppressWarnings("ThrowableNotThrown")
+public class ItNodeRestartTest extends AbstractClusterStartStopTest {
@BeforeEach
- public void beforeEach() {
- List<CompletableFuture<Ignite>> futures = new ArrayList<>();
-
- // Start nodes.
- for (Entry<String, String> entry : nodesCfg.entrySet()) {
- String nodeName = entry.getKey();
- String nodeConfig = entry.getValue();
-
- futures.add(IgnitionManager.start(nodeName, nodeConfig, WORK_DIR.resolve(nodeName)));
+ public void before() throws Exception {
+ for (String name : nodesCfg.keySet()) {
+ IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
}
- // Init cluster.
- IgnitionManager.init(CMG_NODE, List.of(METASTORAGE_NODE), List.of(CMG_NODE), "cluster");
-
- assertThat(CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)), willCompleteSuccessfully());
-
- // Create tables.
- IgniteImpl node = (IgniteImpl) futures.get(0).join();
- sql(node, null, "CREATE TABLE tbl1 (id INT PRIMARY KEY, val INT) WITH partitions = 1");
+ initGrid(nodeAliasToNameMapping.values());
- sql(node, null, "INSERT INTO tbl1(id, val) VALUES (1,1)");
-
- // Shutdown cluster.
- for (int i = futures.size() - 1; i >= 0; i--) {
- IgnitionManager.stop(futures.get(i).join().name());
- }
+ stopAllNodes();
}
+
/** Runs after each test sequence. */
@AfterEach
public void afterEach() {
- log.info("Stop all nodes.");
-
- clusterNodes.keySet().forEach(IgnitionManager::stop);
- clusterNodes.clear();
+ stopAllNodes();
for (String name : nodesCfg.keySet()) {
IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
}
}
- /** Filter out duplicates and invalid grids. */
- private static BiPredicate<String, Set<String>> nodeFilter() {
- return (nodeName, grid) -> (!grid.isEmpty() || CMG_NODE.equals(nodeName)) // CMG node always starts first.
- && (!DATA_NODE_2.equals(nodeName) || grid.contains(DATA_NODE)); // Data nodes are interchangeable.
- }
-
/**
- * Test factory for testing single node restart.
+ * Test factory for testing node startup order.
*
* @return JUnit tests.
- * @see #checkNodeRestart() ()
*/
- @TestFactory
- public Stream<? extends DynamicNode> nodeRestartTestFactory() {
- return GridGenerator.generateGrids(
- nodesCfg.keySet(),
- nodeFilter() // Data nodes are interchangeable.
- ).stream()
- .map(nodes -> {
- if (nodes.size() != 3) {
- return null; //TODO: remove this
- }
-
- ArrayList<DynamicNode> tests = new ArrayList<>();
- for (int i = 0; i < nodes.size(); i++) {
- String nodeName = nodes.get(i);
-
- boolean last = (i == nodes.size() - 1);
- boolean first = i == 0;
-
- Runnable setup = first ? () -> {
- nodes.forEach(this::startNode);
- stopNode(nodeName);
- } : () -> stopNode(nodeName);
-
- tests.add(createTest(
- "Stopped " + nodeLabels.get(nodeName),
- setup,
- NOOP,
- this::checkNodeRestart
- ));
-
- tests.add(createTest(
- "Started " + nodeLabels.get(nodeName),
- () -> startNode(nodeName),
- last ? this::stopCluster : NOOP,
- this::checkNodeRestart
- ));
- }
-
- return DynamicContainer.dynamicContainer("Grid " +
- clusterNodesToString(nodes), tests);
- })
- .filter(Objects::nonNull);
+ static Object[] generateSequence() {
+ return new GridGenerator(
+ nodeAliasToNameMapping.keySet(),
+ (name, grid) -> (!"D2".equals(name) || grid.contains("D")), // Data nodes are interchangeable.
+ new UniqueSetFilter<String>().and(grid -> grid.size() > 2)
+ ).generate().toArray(Object[]::new);
+ }
+
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testNodeJoin(List<String> nodeNames) {
+ runTest(nodeNames, () -> checkNodeJoin(NEW_NODE));
}
- public void checkNodeStartupSequence() {
- log.info("Node startup sequence test: cluster=[" + String.join(", ", new TreeSet<>(clusterNodes.keySet())) + ']');
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testCreateTable(List<String> nodeNames) {
+ runTest(nodeNames, this::checkCreateTable);
+ }
- validateNodeJoin();
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testImplicitTransaction(List<String> nodeNames) {
+ runTest(nodeNames, this::checkImplicitTx);
+ }
- if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
- return; // There is no node startup future finished, as nodes wait for Metastorage on start.
- }
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testReadWriteTransaction(List<String> nodeNames) {
+ runTest(nodeNames, this::checkTxRW);
+ }
- validateDDL();
- validateROTransaction();
- validateRWTransaction();
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testReadOnlyTransaction(List<String> nodeNames) {
+ runTest(nodeNames, this::checkTxRO);
}
- public void checkNodeRestart() {
- log.info("Node restart test: cluster=[" + String.join(", ", new TreeSet<>(clusterNodes.keySet())) + ']');
+ private void runTest(List<String> nodeNames, Runnable testBody) {
+ Set<String> realNames = nodeNames.stream().map(nodeAliasToNameMapping::get).collect(Collectors.toSet());
- validateNodeJoin();
- validateDDL();
- validateROTransaction();
- validateRWTransaction();
+ for (String name : nodeNames) {
+ try {
+ prestartGrid(realNames);
+
+ log.info("Stopping node: label=" + name + ", name=" + nodeAliasToNameMapping.get(name));
+
+ stopNode(nodeAliasToNameMapping.get(name));
+
+ testBody.run();
+
+ log.info("Starting node back: label=" + name + ", name=" + nodeAliasToNameMapping.get(name));
+
+ startNode(nodeAliasToNameMapping.get(name));
+
+ testBody.run();
+ } finally {
+ stopAllNodes();
+ }
+ }
}
- private void validateNodeJoin() {
+ private void prestartGrid(Set<String> nodeNames) {
+ Set<String> expectedNodes = Set.copyOf(nodeNames);
+
+ // Start CMG and MetaStorage first, to activate cluster.
+ List<CompletableFuture<Ignite>> futs = new ArrayList<>();
+ futs.add(startNode(nodeAliasToNameMapping.get("C")));
+ futs.add(startNode(nodeAliasToNameMapping.get("M")));
+
+ nodeNames.stream()
+ .filter(n -> !isNodeStarted(n))
+ .map(this::startNode)
+ .forEach(futs::add);
+
+ assertThat(CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new)), willCompleteSuccessfully());
+
+ // Stop unwanted nodes.
+ futs.stream()
+ .map(f -> f.join().name())
+ .filter(n -> !expectedNodes.contains(n))
+ .forEach(this::stopNode);
+ }
+
+ private void checkNodeJoin(String nodeName) {
try {
- CompletableFuture<Ignite> fut = IgnitionManager.start(NEW_NODE, NEW_NODE_CONFIG, WORK_DIR.resolve(NEW_NODE));
+ CompletableFuture<Ignite> fut = startNode(nodeName);
+
+ if (!isNodeStarted(CMG_NODE)) {
+ assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
- if (!clusterNodes.containsKey(CMG_NODE)) {
- assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.SECONDS), TimeoutException.class);
+ assertTrue(topologyContainsNode("physical", nodeName));
- assertTrue(validateNodeEnterTopology("physical", NEW_NODE));
- assertThrowsWithCause(() -> validateNodeEnterTopology("logical", NEW_NODE), IgniteException.class);
+ // CMG holds logical topology state.
+ assertThrowsWithCause(() -> topologyContainsNode("logical", nodeName), IgniteException.class);
return;
- } else if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
+ } else if (!isNodeStarted(METASTORAGE_NODE)) {
// Node future can't complete as some components requires Metastorage on start.
- assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.SECONDS), TimeoutException.class);
+ assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
- assertTrue(validateNodeEnterTopology("physical", NEW_NODE));
- assertTrue(validateNodeEnterTopology("logical", NEW_NODE));
+ assertTrue(topologyContainsNode("physical", nodeName));
+ //TODO: Is Metastore required to promote node to logical topology?
+ assertFalse(topologyContainsNode("logical", nodeName));
return;
}
assertThat(fut, willCompleteSuccessfully());
- assertTrue(validateNodeEnterTopology("physical", ((IgniteImpl) fut.join()).id()));
- assertTrue(validateNodeEnterTopology("logical", ((IgniteImpl) fut.join()).id()));
+ assertTrue(topologyContainsNode("physical", ((IgniteImpl) fut.join()).id()));
+ assertTrue(topologyContainsNode("logical", ((IgniteImpl) fut.join()).id()));
} finally {
- IgnitionManager.stop(NEW_NODE);
+ IgnitionManager.stop(nodeName);
}
}
- private static boolean validateNodeEnterTopology(String topologyType, String nodeId) {
- StringWriter out = new StringWriter();
- StringWriter err = new StringWriter();
+ private void checkCreateTable() {
+ Ignite node = initializedNode();
- new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST)))
- .setDefaultValueProvider(new EnvironmentDefaultValueProvider())
- .setOut(new PrintWriter(out, true))
- .setErr(new PrintWriter(err, true))
- .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL);
-
- assertThat(err.toString(), IsEmptyString.emptyString());
-
- return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find();
- }
+ if (node == null) {
+ return;
+ }
- private void validateDDL() {
String createTableCommand = "CREATE TABLE tempTbl (id INT PRIMARY KEY, val INT) WITH partitions = 1";
String dropTableCommand = "DROP TABLE IF EXISTS tempTbl";
- Ignite node = getNode();
-
try {
- if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
- assertThrowsWithCause(() -> sql(node, null, createTableCommand), IgniteException.class);
- return;
- }
-
sql(node, null, createTableCommand);
} finally {
sql(node, null, dropTableCommand);
}
}
- public void validateROTransaction() {
- Ignite node = getNode();
+ private void checkTxRO() {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
Transaction roTx = node.transactions().readOnly().begin();
try {
- if (!clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) {
+ if (!isNodeStarted(DATA_NODE) && !isNodeStarted(DATA_NODE_2)) {
assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), IgniteException.class);
return;
- } else if (!clusterNodes.containsKey(DATA_NODE_2)) {
- // Use fake transaction with a timestamp from the past.
+ } else if (!isNodeStarted(DATA_NODE_2)) {
+ // Fake transaction with a timestamp from the past.
Transaction tx0 = Mockito.spy(roTx);
Mockito.when(tx0.readTimestamp()).thenReturn(new HybridTimestamp(1L, 0));
+ sql(node, roTx, "SELECT * FROM tbl1");
- assertThrowsWithCause(() -> sql(node, tx0, "SELECT * FROM tbl1"), IgniteException.class);
+ // Transaction with recent timestamp.
+ assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), IgniteException.class);
return;
}
- assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), IgniteException.class);
+ sql(node, roTx, "SELECT * FROM tbl1");
} finally {
roTx.rollback();
}
}
- public void validateImplicitRWTransaction() {
- Ignite node = getNode();
-
- if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
- assertThrowsWithCause(
- () -> sql(node, null, "INSERT INTO tbl1 VALUES (5, 5)"),
- TransactionException.class,
- "Failed to get the primary replica");
+ public void checkImplicitTx() {
+ Ignite node = initializedNode();
+ if (node == null) {
return;
}
- sql(node, null, "INSERT INTO tbl1 VALUES (2, 2)");
-
- assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2));
+ // TODO: Bound table distribution zone to data nodes and uncomment.
+ // if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
+ if (clusterNodes.size() <= 2 || !isNodeStarted(DATA_NODE)) {
- sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
- }
-
- public void validateRWTransaction() {
- Ignite node = getNode();
-
- if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
- Transaction tx = node.transactions().readOnly().begin();
try {
assertThrowsWithCause(
- () -> sql(node, tx, "INSERT INTO tbl1 VALUES (5, 5)"),
+ () -> sql(node, null, "INSERT INTO tbl1 VALUES (2, -2)"),
TransactionException.class,
"Failed to get the primary replica");
-
- return;
} finally {
- tx.rollback();
+ sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
}
- }
- Transaction tx = node.transactions().readOnly().begin();
- try {
- sql(node, tx, "INSERT INTO tbl1 VALUES (2, 2)");
+ return;
+ }
- tx.commit();
+ sql(node, null, "INSERT INTO tbl1 VALUES (2, 2)");
+ try {
assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2));
} finally {
- tx.rollback();
sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
}
}
- /** Get cluster node. */
- private Ignite getNode() {
- try {
- if (clusterNodes.containsKey(DATA_NODE)) {
- clusterNodes.get(DATA_NODE).get();
- } else if (clusterNodes.containsKey(DATA_NODE_2)) {
- clusterNodes.get(DATA_NODE_2).get();
- }
+ private void checkTxRW() {
+ Ignite node = initializedNode();
- return clusterNodes.values().iterator().next().get();
- } catch (Throwable th) {
- IgniteTestUtils.sneakyThrow(th);
-
- return null;
+ if (node == null) {
+ return;
}
- }
- /**
- * Creates JUnit test node.
- *
- * @param testName Test name.
- * @param setUpRunnable SetUp action.
- * @param tearDownRunnable TearDown action.
- * @param testRunnable Test action.
- * @return JUnit test node.
- */
- private DynamicTest createTest(String testName, Runnable setUpRunnable, Runnable tearDownRunnable, Runnable testRunnable) {
- return DynamicTest.dynamicTest(testName, () -> {
- TestInfoImpl info = new TestInfoImpl(testName);
+ if (!isNodeStarted(DATA_NODE)) {
+ Transaction tx = node.transactions().begin();
try {
- setupBase(info, WORK_DIR);
- setUpRunnable.run();
-
- testRunnable.run();
+ assertThrowsWithCause(
+ () -> sql(node, tx, "INSERT INTO tbl1 VALUES (2, -2)"),
+ TransactionException.class,
+ "Failed to get the primary replica");
} finally {
- tearDownRunnable.run();
- tearDownBase(info);
+ tx.rollback();
+ sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
}
- });
- }
-
- private static String clusterNodesToString(List<String> nodes) {
- return '[' + nodes.stream().map(nodeLabels::get).collect(Collectors.joining(", ")) + ']';
- }
-
- private void startNode(String nodeName) {
- CompletableFuture<Ignite> fut = IgnitionManager.start(nodeName, nodesCfg.get(nodeName), WORK_DIR.resolve(nodeName));
- clusterNodes.put(nodeName, fut);
- }
-
- private void stopNode(String nodeName) {
- Future<?> rmv = clusterNodes.remove(nodeName);
-
- assert rmv != null;
-
- IgnitionManager.stop(nodeName);
- }
-
- private void stopCluster() {
- clusterNodes.keySet().forEach(IgnitionManager::stop);
- clusterNodes.clear();
- }
-
- protected static List<List<Object>> sql(Ignite node, @Nullable Transaction tx, String sql, Object... args) {
- var queryEngine = ((IgniteImpl) node).queryEngine();
-
- SessionId sessionId = queryEngine.createSession(5_000, PropertiesHolder.fromMap(
- Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
- ));
+ return;
+ }
+ Transaction tx = node.transactions().begin();
try {
- var context = tx != null ? QueryContext.of(tx) : QueryContext.of();
+ sql(node, tx, "INSERT INTO tbl1 VALUES (2, 2)");
- return getAllFromCursor(
- await(queryEngine.querySingleAsync(sessionId, context, sql, args))
- );
+ tx.commit();
+
+ assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2));
} finally {
- queryEngine.closeSession(sessionId);
+ tx.rollback();
+ sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
}
}
- /**
- * Grids configurations generator.
- */
- static class GridGenerator {
- static List<List<String>> generateStartupSequences(Collection<String> nodes, BiPredicate<String, Set<String>> nodeFilter) {
- return new GridGenerator(nodeFilter, grid -> grid.size() == nodes.size()).generate(nodes);
- }
-
- static List<List<String>> generateGrids(Collection<String> nodes, BiPredicate<String, Set<String>> nodeFilter) {
- Predicate<Set<String>> filter = new UniqueSetFilter<>();
- filter = filter.and(grid -> grid.size() > 1);
-
- return new GridGenerator(nodeFilter, filter).generate(nodes);
- }
-
- private final LinkedHashSet<String> currentGrid = new LinkedHashSet<>();
- private final List<List<String>> gridStartSequences = new ArrayList<>();
- private final BiPredicate<String, Set<String>> nodeFilter;
- private final Predicate<Set<String>> gridFilter;
-
- private GridGenerator(BiPredicate<String, Set<String>> nodeFilter, Predicate<Set<String>> gridFilter) {
- this.nodeFilter = nodeFilter;
- this.gridFilter = gridFilter;
- }
+ private static boolean topologyContainsNode(String topologyType, String nodeId) {
+ StringWriter out = new StringWriter();
+ StringWriter err = new StringWriter();
- /** Generates tests execution sequence recursively. */
- List<List<String>> generate(Collection<String> nodes) {
- generate0(nodes);
+ new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST)))
+ .setDefaultValueProvider(new EnvironmentDefaultValueProvider())
+ .setOut(new PrintWriter(out, true))
+ .setErr(new PrintWriter(err, true))
+ .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL);
- return gridStartSequences;
- }
+ assertThat(err.toString(), IsEmptyString.emptyString());
- /** Generates tests execution sequence recursively. */
- private void generate0(Collection<String> availableNodes) {
- if (gridFilter.test(currentGrid)) {
- gridStartSequences.add(new ArrayList<>(currentGrid)); // Copy mutable collection.
- }
+ return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find();
+ }
- for (String node : availableNodes) {
- if (!nodeFilter.test(node, currentGrid)) {
- continue; // Skip node from adding to the current grid.
- }
+ /** Find started cluster node or return {@code null} if not found. */
+ private @Nullable Ignite initializedNode() {
+ assert !clusterNodes.isEmpty();
- currentGrid.add(node);
+ CompletableFuture<Ignite> nodeFut = clusterNodes.values().iterator().next();
- HashSet<String> unusedNodes = new HashSet<>(availableNodes);
- unusedNodes.remove(node);
+ if (!isNodeStarted(METASTORAGE_NODE)) {
+ assertThrowsWithCause(() -> nodeFut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
- generate(unusedNodes);
+ clusterNodes.forEach((k, v) -> assertNull(v.getNow(null), k));
- currentGrid.remove(node);
- }
+ return null;
}
+ return nodeFut.join();
}
+
/** Filters out non-unique sets. */
private static class UniqueSetFilter<T> implements Predicate<Set<T>> {
final Set<Set<T>> seenBefore = new HashSet<>();
@@ -584,33 +368,4 @@ public class ItNodeRestartTest extends BaseIgniteAbstractTest {
return seenBefore.add(new HashSet<>(s)); // Copy mutable collection.
}
}
-
- /** Test info implementation for dynamic tests. */
- static class TestInfoImpl implements TestInfo {
- private final String name;
-
- TestInfoImpl(String name) {
- this.name = name;
- }
-
- @Override
- public String getDisplayName() {
- return name;
- }
-
- @Override
- public Set<String> getTags() {
- return Set.of();
- }
-
- @Override
- public Optional<Class<?>> getTestClass() {
- return Optional.of(ItNodeRestartTest.class);
- }
-
- @Override
- public Optional<Method> getTestMethod() {
- return Optional.empty();
- }
- }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeStartStopTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeStartStopTest.java
deleted file mode 100644
index 425baa80ff..0000000000
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeStartStopTest.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.cluster;
-
-import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import io.micronaut.configuration.picocli.MicronautFactory;
-import io.micronaut.context.ApplicationContext;
-import io.micronaut.context.env.Environment;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.Method;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiPredicate;
-import java.util.function.Predicate;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgnitionManager;
-import org.apache.ignite.internal.app.EnvironmentDefaultValueProvider;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.cli.commands.TopLevelCliCommand;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.ReplicaManager;
-import org.apache.ignite.internal.sql.engine.QueryContext;
-import org.apache.ignite.internal.sql.engine.QueryProperty;
-import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
-import org.apache.ignite.internal.sql.engine.session.SessionId;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.tx.Transaction;
-import org.apache.ignite.tx.TransactionException;
-import org.hamcrest.Matchers;
-import org.hamcrest.text.IsEmptyString;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assumptions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DynamicContainer;
-import org.junit.jupiter.api.DynamicNode;
-import org.junit.jupiter.api.DynamicTest;
-import org.junit.jupiter.api.TestFactory;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mockito;
-import picocli.CommandLine;
-
-/**
- * Test node start/stop in different scenarios and validate grid components behavior depending on availability/absence of quorums.
- */
-//TODO: Fix expected messages in assertThrows
-// TODO: Create 2 distribution zones, which spans a single node and both data nodes, and a tables in these zones.
-@SuppressWarnings("ThrowableNotThrown")
-@ExtendWith(WorkDirectoryExtension.class)
-@WithSystemProperty(key = "org.jline.terminal.dumb", value = "true")
-public class ItNodeStartStopTest extends BaseIgniteAbstractTest {
- public static final int NODE_JOIN_WAIT_TIMEOUT = 500;
- public static final Runnable NOOP = () -> {
- };
-
- /** Work directory. */
- @WorkDirectory
- private static Path WORK_DIR;
-
- private static final String connectionAddr = "\"localhost:3344\", \"localhost:3345\", \"localhost:3346\"";
-
- /** Correct ignite cluster url. */
- protected static final String NODE_URL = "http://localhost:10300";
-
- /** Cluster management group node name. */
- private static final String CMG_NODE = "node1";
- /** MetaStorage group node name. */
- private static final String METASTORAGE_NODE = "node3";
- /** Data node 1 name. */
- private static final String DATA_NODE = "node2"; // Partition leader.
- /** Data node 2 name. */
- private static final String DATA_NODE_2 = "node4";
- /** New node name. */
- private static final String NEW_NODE = "newNode";
-
- /** Nodes configurations. */
- private static final Map<String, String> nodesCfg = Map.of(
- "node1", "{\n"
- + " \"network\": {\n"
- + " \"port\":3344,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}",
- "node2", "{\n"
- + " \"network\": {\n"
- + " \"port\":3345,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}",
- "node3", "{\n"
- + " \"network\": {\n"
- + " \"port\":3346,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}",
- "node4", "{\n"
- + " \"network\": {\n"
- + " \"port\":3347,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}"
- );
-
- private static final String NEW_NODE_CONFIG = "{\n"
- + " \"network\": {\n"
- + " \"port\":3348,\n"
- + " \"nodeFinder\":{\n"
- + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
- + " }\n"
- + " }\n"
- + "}";
-
- // TODO: Drop labels and make node names meaningful, when distribution zones will be implemented.
- // Labels are used for better understanding node role.
- // Node names uses for partition affinity calculation, we can't guarantee a node with name "DATA_NODE" will own a partition.
- // So, static meaningless node names are used, which are mapped to labels.
- private static final Map<String, String> nodeLabels = Map.of(
- CMG_NODE, "C",
- METASTORAGE_NODE, "M",
- DATA_NODE, "D",
- DATA_NODE_2, "D2",
- NEW_NODE, "N"
- );
-
- /** Cluster nodes. */
- private final Map<String, CompletableFuture<Ignite>> clusterNodes = new HashMap<>();
-
- /** Runs after each test sequence. */
- @BeforeEach
- public void before() throws Exception {
- assert clusterNodes.isEmpty();
-
- for (String name : nodesCfg.keySet()) {
- IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
- }
-
- List<CompletableFuture<Ignite>> futures = new ArrayList<>();
-
- // Start nodes.
- nodesCfg.forEach((k, v) -> futures.add(IgnitionManager.start(k, v, WORK_DIR.resolve(k))));
-
- // Init cluster.
- //TODO: Use dedicated node for metastorage.
- IgnitionManager.init(CMG_NODE, List.of(METASTORAGE_NODE), List.of(CMG_NODE), "cluster");
-
- for (CompletableFuture<Ignite> future : futures) {
- assertThat(future, willCompleteSuccessfully());
- }
-
- // Create tables.
- IgniteImpl node = (IgniteImpl) futures.get(0).join();
- sql(node, null, "CREATE TABLE tbl1 (id INT PRIMARY KEY, val INT) WITH partitions = 1");
-
- for (CompletableFuture<Ignite> f : futures) {
- ReplicaManager replicaMgr = (ReplicaManager) MethodHandles.privateLookupIn(IgniteImpl.class, MethodHandles.lookup())
- .findVarHandle(IgniteImpl.class, "replicaMgr", ReplicaManager.class)
- .get(f.get());
-
- assertTrue(DATA_NODE.equals(f.get().name()) ^ replicaMgr.startedGroups().isEmpty());
- }
-
- sql(node, null, "INSERT INTO tbl1(id, val) VALUES (1,1)");
-
- // Shutdown cluster.
- for (int i = futures.size() - 1; i >= 0; i--) {
- IgnitionManager.stop(futures.get(i).join().name());
- }
- }
-
- /** Runs after each test sequence. */
- @AfterEach
- public void afterEach() throws IOException {
- log.info("Stop all nodes.");
-
- clusterNodes.keySet().forEach(IgnitionManager::stop);
- clusterNodes.clear();
-
- for (String name : nodesCfg.keySet()) {
- IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
- }
- }
-
- /** Filter out duplicates and invalid grids. */
- private static BiPredicate<String, Set<String>> nodeFilter() {
- return (nodeName, grid) -> (!grid.isEmpty() || CMG_NODE.equals(nodeName)) // CMG node always starts first.
- && (!DATA_NODE_2.equals(nodeName) || grid.contains(DATA_NODE)); // Data nodes are interchangeable.
- }
-
- /**
- * Test factory for testing node startup order.
- *
- * @return JUnit tests.
- */
- @TestFactory
- public Stream<DynamicNode> gridStartupTestFactory() {
- return GridGenerator.generateStartupSequences(
- nodesCfg.keySet(),
- nodeFilter()
- ).stream()
- .map(nodes -> {
- List<DynamicNode> scenarioSteps = new ArrayList<>();
- List<String> startedNodes = new ArrayList<>();
-
- for (int i = 0; i < nodes.size(); i++) {
- String nodeName = nodes.get(i);
- startedNodes.add(nodeName);
-
- boolean first = (i == 0);
- boolean last = (i >= (nodes.size() - 1));
-
- Runnable setup = () -> {
- assert !first || clusterNodes.isEmpty();
-
- startNode(nodeName);
- };
-
- Runnable tearDown = last ? () -> stopCluster(nodes) : NOOP;
-
- scenarioSteps.add(DynamicContainer.dynamicContainer(
- "Started " + clusterNodesToString(startedNodes),
- List.of(
- createDynamicTest("checkNewNodeJoin", setup, this::checkNewNodeJoin, NOOP),
- createDynamicTest("checkDDL", NOOP, this::checkDDL, NOOP),
- createDynamicTest("checkROTransaction", NOOP, this::checkROTransacton, NOOP),
-// createTest("validateRWTransaction", NOOP, NOOP, this::validateRWTransaction),
- createDynamicTest("checkImplicitRWTransaction", NOOP, this::checkImplicitRWTransaction, tearDown)
- )
- ));
- }
-
- return DynamicContainer.dynamicContainer("Start sequence " + clusterNodesToString(nodes), scenarioSteps);
- });
- }
-
- private void checkNewNodeJoin() {
- try {
- CompletableFuture<Ignite> fut = IgnitionManager.start(NEW_NODE, NEW_NODE_CONFIG, WORK_DIR.resolve(NEW_NODE));
-
- if (!clusterNodes.containsKey(CMG_NODE)) {
- assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
-
- assertTrue(validateNodeEnterTopology("physical", NEW_NODE));
-
- // CMG holds logical topology state.
- assertThrowsWithCause(() -> validateNodeEnterTopology("logical", NEW_NODE), IgniteException.class);
-
- return;
- } else if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
- // Node future can't complete as some components requires Metastorage on start.
- assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
-
- assertTrue(validateNodeEnterTopology("physical", NEW_NODE));
- assertFalse(
- validateNodeEnterTopology("logical", NEW_NODE)); //TODO: Is Metastore required to promote node to logical topology?
-
- return;
- }
-
- assertThat(fut, willCompleteSuccessfully());
-
- assertTrue(validateNodeEnterTopology("physical", ((IgniteImpl) fut.join()).id()));
- assertTrue(validateNodeEnterTopology("logical", ((IgniteImpl) fut.join()).id()));
- } finally {
- IgnitionManager.stop(NEW_NODE);
- }
- }
-
- private static boolean validateNodeEnterTopology(String topologyType, String nodeId) {
- StringWriter out = new StringWriter();
- StringWriter err = new StringWriter();
-
- new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST)))
- .setDefaultValueProvider(new EnvironmentDefaultValueProvider())
- .setOut(new PrintWriter(out, true))
- .setErr(new PrintWriter(err, true))
- .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL);
-
- assertThat(err.toString(), IsEmptyString.emptyString());
-
- return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find();
- }
-
- public void checkDDL() {
- String createTableCommand = "CREATE TABLE tempTbl (id INT PRIMARY KEY, val INT) WITH partitions = 1";
- String dropTableCommand = "DROP TABLE IF EXISTS tempTbl";
-
- Ignite node = getNode();
-
- try {
- sql(node, null, createTableCommand);
- } finally {
- sql(node, null, dropTableCommand);
- }
- }
-
- public void checkROTransacton() {
- Ignite node = getNode();
- Transaction roTx = node.transactions().readOnly().begin();
-
- try {
- if (!clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) {
- assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), IgniteException.class);
-
- return;
- } else if (!clusterNodes.containsKey(DATA_NODE_2)) {
- // Use fake transaction with a timestamp from the past.
- Transaction tx0 = Mockito.spy(roTx);
- Mockito.when(tx0.readTimestamp()).thenReturn(new HybridTimestamp(1L, 0));
-
-// assertThrowsWithCause(() -> sql(node, tx0, "SELECT * FROM tbl1"), IgniteException.class);
-
- sql(node, roTx, "SELECT * FROM tbl1");
-
- return;
- }
-
- sql(node, roTx, "SELECT * FROM tbl1");
- } finally {
- roTx.rollback();
- }
- }
-
- public void checkImplicitRWTransaction() {
- Ignite node = getNode();
-
- // TODO: Bound table distribution zone to data nodes and uncomment.
- // if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
- if (clusterNodes.size() <= 2 || !clusterNodes.containsKey(DATA_NODE)) {
-
- try {
- assertThrowsWithCause(
- () -> sql(node, null, "INSERT INTO tbl1 VALUES (2, -2)"),
- TransactionException.class,
- "Failed to get the primary replica");
- } finally {
- sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
- }
-
- return;
- }
-
- sql(node, null, "INSERT INTO tbl1 VALUES (2, 2)");
-
- try {
- assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2));
- } finally {
- sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
- }
- }
-
- public void validateRWTransaction() {
- Ignite node = getNode();
-
- if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
- Transaction tx = node.transactions().readOnly().begin();
- try {
- assertThrowsWithCause(
- () -> sql(node, tx, "INSERT INTO tbl1 VALUES (2, -2)"),
- TransactionException.class,
- "Failed to get the primary replica");
- } finally {
- tx.rollback();
- sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
- }
-
- return;
- }
-
- Transaction tx = node.transactions().readOnly().begin();
- try {
- sql(node, tx, "INSERT INTO tbl1 VALUES (2, 2)");
-
- tx.commit();
-
- assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2));
- } finally {
- tx.rollback();
- sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
- }
- }
-
- /** Get cluster node. */
- private @NotNull Ignite getNode() {
- try {
- CompletableFuture<Ignite> nodeFut;
-// if (clusterNodes.containsKey(DATA_NODE)) {
-// nodeFut = clusterNodes.get(DATA_NODE);
-// } else if (clusterNodes.containsKey(DATA_NODE_2)) {
-// nodeFut = clusterNodes.get(DATA_NODE_2);
-// } else {
-// }
- nodeFut = clusterNodes.values().iterator().next();
-
- if (clusterNodes.containsKey(METASTORAGE_NODE))
- return nodeFut.join();
-
- return nodeFut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
- } catch (TimeoutException ex) {
- Assumptions.assumeTrue(clusterNodes.containsKey(METASTORAGE_NODE),
- "No Ignite object instances available, because node initialization requires Metastorage.");
-
- IgniteTestUtils.sneakyThrow(ex);
- } catch (Throwable th) {
- IgniteTestUtils.sneakyThrow(th);
- }
-
- return null;
- }
-
- /**
- * Creates JUnit test node.
- *
- * @param testName Test name.
- * @param setUpRunnable SetUp action.
- * @param testRunnable Test action.
- * @param tearDownRunnable TearDown action.
- * @return JUnit test node.
- */
- private DynamicTest createDynamicTest(String testName, Runnable setUpRunnable, Runnable testRunnable, Runnable tearDownRunnable) {
- return DynamicTest.dynamicTest(testName, () -> {
- TestInfoImpl info = new TestInfoImpl(testName);
- try {
- setupBase(info, WORK_DIR);
- setUpRunnable.run();
-
- testRunnable.run();
- } finally {
- tearDownRunnable.run();
- tearDownBase(info);
- }
- });
- }
-
- private static String clusterNodesToString(Collection<String> nodes) {
- return '[' + nodes.stream().map(nodeLabels::get).collect(Collectors.joining(", ")) + ']';
- }
-
- private void startNode(String nodeName) {
- String nodeConfig = nodesCfg.get(nodeName);
-
- CompletableFuture<Ignite> node = IgnitionManager.start(nodeName, nodeConfig, WORK_DIR.resolve(nodeName));
-
- clusterNodes.put(nodeName, node);
- }
-
- private void stopNode(String nodeName) {
- Future<?> rmv = clusterNodes.remove(nodeName);
-
- assert rmv != null;
-
- IgnitionManager.stop(nodeName);
- }
-
- private void stopCluster(List<String> nodes) {
- for (int i = nodes.size() - 1; i >= 0; i--) {
- stopNode(nodes.get(i));
- }
-
- assert clusterNodes.isEmpty();
- }
-
- protected List<List<Object>> sql(Ignite node, @Nullable Transaction tx, String sql, Object... args) {
- var queryEngine = ((IgniteImpl) node).queryEngine();
-
- SessionId sessionId = queryEngine.createSession(5_000, PropertiesHolder.fromMap(
- Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
- ));
-
- try {
- var context = tx != null ? QueryContext.of(tx) : QueryContext.of();
-
- return getAllFromCursor(
- await(queryEngine.querySingleAsync(sessionId, context, sql, args))
- );
- } finally {
- queryEngine.closeSession(sessionId);
- }
- }
-
- /**
- * Grids configurations generator.
- */
- static class GridGenerator {
- static List<List<String>> generateStartupSequences(Collection<String> nodes, BiPredicate<String, Set<String>> nodeFilter) {
- return new GridGenerator(nodeFilter, grid -> grid.size() == nodes.size()).generate(nodes);
- }
-
- static List<List<String>> generateGrids(Collection<String> nodes, BiPredicate<String, Set<String>> nodeFilter) {
- Predicate<Set<String>> filter = new UniqueSetFilter<>();
- filter = filter.and(grid -> grid.size() > 1);
-
- return new GridGenerator(nodeFilter, filter).generate(nodes);
- }
-
- private final LinkedHashSet<String> currentGrid = new LinkedHashSet<>();
- private final List<List<String>> gridStartSequences = new ArrayList<>();
- private final BiPredicate<String, Set<String>> nodeFilter;
- private final Predicate<Set<String>> gridFilter;
-
- private GridGenerator(BiPredicate<String, Set<String>> nodeFilter, Predicate<Set<String>> gridFilter) {
- this.nodeFilter = nodeFilter;
- this.gridFilter = gridFilter;
- }
-
- /** Generates tests execution sequence recursively. */
- List<List<String>> generate(Collection<String> nodes) {
- generate0(nodes);
-
- return gridStartSequences;
- }
-
- /** Generates tests execution sequence recursively. */
- private void generate0(Collection<String> availableNodes) {
- if (gridFilter.test(currentGrid)) {
- gridStartSequences.add(new ArrayList<>(currentGrid)); // Copy mutable collection.
- }
-
- for (String node : availableNodes) {
- if (!nodeFilter.test(node, currentGrid)) {
- continue; // Skip node from adding to the current grid.
- }
-
- currentGrid.add(node);
-
- HashSet<String> unusedNodes = new HashSet<>(availableNodes);
- unusedNodes.remove(node);
-
- generate(unusedNodes);
-
- currentGrid.remove(node);
- }
- }
-
- }
-
- /** Filters out non-unique sets. */
- private static class UniqueSetFilter<T> implements Predicate<Set<T>> {
- final Set<Set<T>> seenBefore = new HashSet<>();
-
- @Override
- public boolean test(Set<T> s) {
- return seenBefore.add(new HashSet<>(s)); // Copy mutable collection.
- }
- }
-
- /** Test info implementation for dynamic tests. */
- static class TestInfoImpl implements TestInfo {
- private final String name;
-
- TestInfoImpl(String name) {
- this.name = name;
- }
-
- @Override
- public String getDisplayName() {
- return name;
- }
-
- @Override
- public Set<String> getTags() {
- return Set.of();
- }
-
- @Override
- public Optional<Class<?>> getTestClass() {
- return Optional.of(ItNodeStartStopTest.class);
- }
-
- @Override
- public Optional<Method> getTestMethod() {
- return Optional.empty();
- }
- }
-}