You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/12/21 10:47:46 UTC
[ignite-3] 01/06: Add tests.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-18323
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 50aceb897af9b53f5f5d54e3ae8cb5c01faf4d81
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Nov 22 15:24:57 2022 +0300
Add tests.
---
modules/runner/build.gradle | 1 +
modules/runner/pom.xml | 5 +
.../cluster/AbstractClusterStartStopTest.java | 374 ++++++++++++++++++
.../internal/cluster/ItClusterStartupTest.java | 370 ++++++++++++++++++
.../ignite/internal/cluster/ItNodeRestartTest.java | 417 +++++++++++++++++++++
5 files changed, 1167 insertions(+)
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 762e6bca31..b55d85a201 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -86,6 +86,7 @@ dependencies {
integrationTestImplementation project(':ignite-page-memory')
integrationTestImplementation project(':ignite-raft-api')
integrationTestImplementation project(':ignite-client')
+ integrationTestImplementation project(':ignite-cli')
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-configuration')))
integrationTestImplementation(testFixtures(project(':ignite-schema')))
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index ac460eefd3..6f3f506698 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -140,6 +140,11 @@
</dependency>
<!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-cli</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/AbstractClusterStartStopTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/AbstractClusterStartStopTest.java
new file mode 100644
index 0000000000..baaa8dc9b1
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/AbstractClusterStartStopTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.micronaut.configuration.picocli.MicronautFactory;
+import io.micronaut.context.ApplicationContext;
+import io.micronaut.context.env.Environment;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.EnvironmentDefaultValueProvider;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.cli.commands.TopLevelCliCommand;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProperty;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.tx.Transaction;
+import org.hamcrest.text.IsEmptyString;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.extension.ExtendWith;
+import picocli.CommandLine;
+
+/**
+ * Base class for cluster start/stop scenarios.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@WithSystemProperty(key = "org.jline.terminal.dumb", value = "true")
+abstract class AbstractClusterStartStopTest extends BaseIgniteAbstractTest {
+ protected static final Supplier<Integer> UNIQ_INT = () -> ThreadLocalRandom.current().nextInt();
+
+ /** Timeout to wait for node join. */
+ protected static final int OPERATION_WAIT_TIMEOUT = 20_000;
+
+ /** Work directory. */
+ @WorkDirectory
+ protected static Path WORK_DIR;
+
+ /** Addresses for Node finder. */
+ protected static final String connectionAddr = "\"localhost:3344\", \"localhost:3345\", \"localhost:3346\"";
+
+ /** Correct ignite cluster url. */
+ protected static final String NODE_URL = "http://localhost:10300";
+
+ /** Cluster management group node name. */
+ protected static final String CMG_NODE = "node1";
+ /** MetaStorage group node name. */
+ protected static final String METASTORAGE_NODE = "node3";
+ /** Data node 1 name. */
+ protected static final String DATA_NODE = "node2"; // Partition leader.
+ /** Data node 2 name. */
+ protected static final String DATA_NODE_2 = "node4";
+ /** New node name. */
+ protected static final String NEW_NODE = "newNode";
+
+ /** Nodes configurations. */
+ protected static final Map<String, String> nodesCfg = Map.of(
+ "node1", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3344,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}",
+ "node2", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3345,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}",
+ "node3", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3346,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}",
+ "node4", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3347,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}",
+ "newNode", "{\n"
+ + " \"network\": {\n"
+ + " \"port\":3348,\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+
+ // TODO: Change Map -> Set.
+ // Map is used as node names uses for partition affinity calculation,
+ // but we can't guarantee a node with name "DATA_NODE" will own a partition.
+ protected static final Map<String, String> nodeAliasToNameMapping = Map.of(
+ "C", CMG_NODE,
+ "M", METASTORAGE_NODE,
+ "D", DATA_NODE,
+ "D2", DATA_NODE_2
+ );
+
+ /** Resolves node alias to node name. */
+ protected static String resolve(String nodeAliases) {
+ return nodeAliasToNameMapping.get(nodeAliases);
+ }
+
+ /** Cluster nodes. */
+ protected final Map<String, CompletableFuture<Ignite>> clusterNodes = new HashMap<>();
+
+ /** Starts and initialize grid. */
+ protected List<CompletableFuture<Ignite>> initGrid(Collection<String> nodes) throws Exception {
+ List<CompletableFuture<Ignite>> futures = startNodes(nodes);
+
+ // Init cluster.
+ IgnitionManager.init(CMG_NODE, List.of(METASTORAGE_NODE), List.of(CMG_NODE), "cluster");
+
+ for (CompletableFuture<Ignite> future : futures) {
+ assertThat(future, willCompleteSuccessfully());
+ }
+
+ // Create tables.
+ IgniteImpl node = (IgniteImpl) futures.get(0).join();
+ sql(node, null, "CREATE TABLE tbl1 (id INT PRIMARY KEY, val INT) WITH partitions = 1, replicas = 1");
+
+ sql(node, null, "INSERT INTO tbl1(id, val) VALUES (1,1)");
+
+ return futures;
+ }
+
+ /**
+ * Start nodes.
+ *
+ * @param names Nodes names.
+ * @return Nodes start futures.
+ */
+ protected List<CompletableFuture<Ignite>> startNodes(Collection<String> names) {
+ return names.stream()
+ .map(this::startNode)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Stops all nodes.
+ */
+ protected void stopAllNodes() {
+ List<String> names0 = List.copyOf(clusterNodes.keySet());
+
+ for (int i = names0.size() - 1; i >= 0; i--) {
+ stopNode(names0.get(i));
+ }
+ }
+
+ /**
+ * Starts node.
+ *
+ * @param nodeName Node name.
+ * @return Node start future.
+ */
+ protected CompletableFuture<Ignite> startNode(String nodeName) {
+ String nodeConfig = nodesCfg.get(nodeName);
+
+ CompletableFuture<Ignite> fut = IgnitionManager.start(nodeName, nodeConfig, WORK_DIR.resolve(nodeName));
+
+ clusterNodes.put(nodeName, fut);
+
+ return fut;
+ }
+
+ /**
+ * Stops node.
+ *
+ * @param nodeName Node name.
+ */
+ protected void stopNode(String nodeName) {
+ CompletableFuture<Ignite> rmv = clusterNodes.remove(nodeName);
+
+ assert rmv != null;
+
+ IgnitionManager.stop(nodeName);
+ }
+
+ /**
+ * Check node was started.
+ *
+ * @param nodeName Node name.
+ * @return {@code true} if node was started, {@code false} otherwise.
+ */
+ protected boolean isNodeStarted(String nodeName) {
+ return clusterNodes.containsKey(nodeName);
+ }
+
+ /**
+ * Starts sql query.
+ *
+ * @param node Node initiator.
+ * @param tx Transaction or {@code null}.
+ * @param sql Sql query.
+ * @param args Query arguments.
+ * @return Query result.
+ */
+ protected static List<List<Object>> sql(Ignite node, @Nullable Transaction tx, String sql, Object... args) {
+ var queryEngine = ((IgniteImpl) node).queryEngine();
+
+ SessionId sessionId = queryEngine.createSession(5_000, PropertiesHolder.fromMap(
+ Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
+ ));
+
+ try {
+ var context = tx != null ? QueryContext.of(tx) : QueryContext.of();
+
+ return getAllFromCursor(
+ await(queryEngine.querySingleAsync(sessionId, context, sql, args))
+ );
+ } finally {
+ queryEngine.closeSession(sessionId);
+ }
+ }
+
+ /**
+ * Checks it node present in logical topology.
+ *
+ * @param nodeId Node id.
+ * @return {@code True} if node present in logical topology, {@code false} othewise.
+ */
+ protected static boolean logicalTopologyContainsNode(String nodeId) {
+ return nodeInTopology("logical", nodeId);
+ }
+
+
+ /**
+ * Checks it node present in physical topology.
+ *
+ * @param nodeId Node id.
+ * @return {@code True} if node present in physical topology, {@code false} othewise.
+ */
+ protected static boolean physicalTopologyContainsNode(String nodeId) {
+ return nodeInTopology("physical", nodeId);
+ }
+
+ /** Checks node present in topology (via REST endpoint). */
+ private static boolean nodeInTopology(String topologyType, String nodeId) {
+ StringWriter out = new StringWriter();
+ StringWriter err = new StringWriter();
+
+ new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST)))
+ .setDefaultValueProvider(new EnvironmentDefaultValueProvider())
+ .setOut(new PrintWriter(out, true))
+ .setErr(new PrintWriter(err, true))
+ .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL);
+
+ assertThat(err.toString(), IsEmptyString.emptyString());
+
+ return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find();
+ }
+
+ /**
+ * Sequence generator. Brute force algo to generate all possible sequences from given items regarding the filters. Sequence doesn't
+ * allow duplicate items.
+ */
+ static class SequenceGenerator {
+ /** Current sequence. */
+ private final LinkedHashSet<String> currentSequence = new LinkedHashSet<>();
+
+ /** Result sequences. */
+ private final List<List<String>> result = new ArrayList<>();
+
+ /** Available items. */
+ private final Collection<String> items;
+
+ /** Filter that accepts next item candidate and the current sequence. */
+ private final BiPredicate<String, Set<String>> itemFilter;
+
+ /** Sequence filter. */
+ private final Predicate<Set<String>> sequenceFilter;
+
+ /**
+ * Creates sequence generator.
+ *
+ * @param items Items.
+ * @param itemCandidateFilter Items candidate filter that accepts new item candidate and current sequence state.
+ * @param sequenceFilter Sequence filter.
+ */
+ SequenceGenerator(
+ Set<String> items,
+ BiPredicate<String, Set<String>> itemCandidateFilter,
+ Predicate<Set<String>> sequenceFilter
+ ) {
+ this.items = items;
+ this.itemFilter = itemCandidateFilter;
+ this.sequenceFilter = sequenceFilter;
+ }
+
+ /**
+ * Start sequences generation.
+ *
+ * @return Generated sequences.
+ */
+ List<List<String>> generate() {
+ generate0(items);
+
+ return result;
+ }
+
+ /** Generates sequence recursively. */
+ private void generate0(Collection<String> availableNodes) {
+ if (sequenceFilter.test(currentSequence)) {
+ result.add(List.copyOf(currentSequence)); // Copy mutable collection.
+ }
+
+ for (String node : availableNodes) {
+ if (!itemFilter.test(node, currentSequence)) {
+ continue; // Skip node from adding to the current grid.
+ }
+
+ currentSequence.add(node);
+
+ Set<String> unusedNodes = new LinkedHashSet<>(availableNodes);
+ unusedNodes.remove(node);
+
+ generate0(unusedNodes);
+
+ currentSequence.remove(node);
+ }
+ }
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItClusterStartupTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItClusterStartupTest.java
new file mode 100644
index 0000000000..c4b2161b9a
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItClusterStartupTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+/**
+ * Test node start/stop in different scenarios and validate grid components behavior depending on availability/absence of quorums.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+public class ItClusterStartupTest extends AbstractClusterStartStopTest {
+ @BeforeEach
+ public void before() throws Exception {
+ for (String name : nodesCfg.keySet()) {
+ IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
+ }
+
+ initGrid(nodeAliasToNameMapping.values());
+
+ // Shutdown cluster.
+ stopAllNodes();
+ }
+
+ /** Runs after each test sequence. */
+ @AfterEach
+ public void afterEach() {
+ stopAllNodes();
+
+ for (String name : nodesCfg.keySet()) {
+ IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
+ }
+ }
+
+ /**
+ * Generate node start sequences.
+ *
+ * @return Test parameters.
+ */
+ static Object[] generateParameters() {
+ return new SequenceGenerator(
+ nodeAliasToNameMapping.keySet(),
+ (name, grid) -> (!grid.isEmpty() || "C".equals(name)) // CMG node always starts first.
+ && (!"D2".equals(name) || grid.contains("D")), // Data nodes are interchangeable.
+ grid -> grid.size() == nodeAliasToNameMapping.size()
+ ).generate().toArray(Object[]::new);
+ }
+
+ /** Checks new node joining to the grid. */
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testNodeJoin(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkNodeJoin();
+ }
+ }
+
+ /** Checks table creation. */
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testCreateTable(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkCreateTable();
+ }
+ }
+
+ /** Checks implicit transaction. */
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testImplicitTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ int key = UNIQ_INT.get();
+
+ checkImplicitTx((node, tx) -> {
+ RecordView<Tuple> tupleRecordView = node.tables().table("tbl1").recordView();
+
+ return tupleRecordView.insertAsync(tx, Tuple.create(Map.of("id", key, "val", key)))
+ .orTimeout(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ .join();
+ });
+ }
+ }
+
+ /** Checks read-write transaction. */
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testReadWriteTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ int key = UNIQ_INT.get();
+
+ checkImplicitTx((node, tx) -> {
+ RecordView<Tuple> tupleRecordView = node.tables().table("tbl1").recordView();
+
+ return tupleRecordView.insertAsync(tx, Tuple.create(Map.of("id", key, "val", key)))
+ .orTimeout(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ .join();
+ });
+ }
+ }
+
+ /** Checks read-only transaction. */
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testReadOnlyTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkTxRO((node, tx) -> node.tables().table("tbl1")
+ .keyValueView()
+ .getAsync(tx, Tuple.create(Map.of("id", 1)))
+ .orTimeout(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ .join());
+ }
+ }
+
+ /** Checks implicit transaction. */
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testSqlWithImplicitTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ int key = UNIQ_INT.get();
+
+ checkImplicitTx((node, tx) -> sql(node, null, String.format("INSERT INTO tbl1 VALUES (%d, %d)", key, key)));
+ }
+ }
+
+ /** Checks read-write transaction. */
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testSqlWithReadWriteTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ int key = UNIQ_INT.get();
+
+ checkTxRW((node, tx) -> sql(node, tx, String.format("INSERT INTO tbl1 VALUES (%d, %d)", key, key)));
+ }
+ }
+
+ /** Checks read-only transaction. */
+ @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateParameters")
+ public void testSqlWithReadOnlyTransaction(List<String> nodeAliases) {
+ for (String alias : nodeAliases) {
+ log.info("Starting node: alias=" + alias + ", name=" + resolve(alias));
+
+ startNode(resolve(alias));
+
+ checkTxRO((node, tx) -> sql(node, tx, "SELECT * FROM tbl1"));
+ }
+ }
+
+ private void checkNodeJoin() {
+ try {
+ CompletableFuture<Ignite> fut = startNode(NEW_NODE);
+
+ if (!isNodeStarted(CMG_NODE)) {
+ assertThrowsWithCause(() -> fut.get(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
+
+ assertTrue(physicalTopologyContainsNode(NEW_NODE));
+ // CMG, which holds logical topology state, is unavailable.
+ assertThrowsWithCause(() -> logicalTopologyContainsNode(NEW_NODE), IgniteException.class);
+
+ return;
+ } else if (!isNodeStarted(METASTORAGE_NODE)) {
+ // Node future can't complete as some components requires Metastorage on start.
+ assertThrowsWithCause(() -> fut.get(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
+
+ assertTrue(physicalTopologyContainsNode(NEW_NODE));
+ assertFalse(logicalTopologyContainsNode(NEW_NODE)); //TODO: Is Metastore required to promote node to logical topology?
+
+ return;
+ }
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ assertTrue(physicalTopologyContainsNode(((IgniteImpl) fut.join()).id()));
+ assertTrue(logicalTopologyContainsNode(((IgniteImpl) fut.join()).id()));
+ } finally {
+ IgnitionManager.stop(NEW_NODE);
+ }
+ }
+
+ private void checkCreateTable() {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ String createTableCommand = "CREATE TABLE tempTbl (id INT PRIMARY KEY, val INT) WITH partitions = 1";
+ String dropTableCommand = "DROP TABLE IF EXISTS tempTbl";
+
+ try {
+ sql(node, null, createTableCommand);
+ } finally {
+ sql(node, null, dropTableCommand);
+ }
+ }
+
+ private void checkTxRO(BiFunction<Ignite, Transaction, Object> op) {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ Transaction roTx = node.transactions().readOnly().begin();
+
+ try {
+ if (!isNodeStarted(DATA_NODE) && !isNodeStarted(DATA_NODE_2)) {
+ assertThrowsWithCause(() -> op.apply(node, roTx), Exception.class);
+
+ return;
+
+ // TODO: Bound table distribution zone to data nodes and uncomment.
+ // else if (!clusterNodes.containsKey(DATA_NODE_2)) {
+ } else if (isNodeStarted(DATA_NODE_2) && clusterNodes.size() <= 2 /* no quorum */) {
+ // Fake transaction with a timestamp from the past.
+ Transaction tx0 = Mockito.spy(roTx);
+ Mockito.when(tx0.readTimestamp()).thenReturn(new HybridTimestamp(1L, 0));
+
+ op.apply(node, roTx);
+
+ // Transaction with recent timestamp.
+ assertThrowsWithCause(() -> op.apply(node, roTx), Exception.class);
+
+ return;
+ }
+
+ op.apply(node, roTx);
+ } finally {
+ roTx.rollback();
+ }
+ }
+
+ private void checkImplicitTx(BiFunction<Ignite, Transaction, Object> op) {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ // TODO: Bound table distribution zone to data nodes and uncomment.
+ // if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
+ if (clusterNodes.size() <= 2 || !isNodeStarted(DATA_NODE)) {
+ assertThrowsWithCause(() -> op.apply(node, null), Exception.class);
+
+ return;
+ }
+
+ op.apply(node, null);
+ }
+
+ private void checkTxRW(BiFunction<Ignite, Transaction, Object> op) {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ // TODO: Bound table distribution zone to data nodes and uncomment.
+ // if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
+ if (clusterNodes.size() <= 2 || !isNodeStarted(DATA_NODE)) {
+ Transaction tx = node.transactions().begin();
+ try {
+ assertThrowsWithCause(() -> op.apply(node, tx), Exception.class);
+ } finally {
+ tx.rollback();
+ }
+
+ return;
+ }
+
+ Transaction tx = node.transactions().begin();
+ try {
+ op.apply(node, tx);
+
+ tx.commit();
+ } finally {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-18324
+ // tx.rollback();
+ }
+ }
+
+ /** Find started cluster node or return {@code null} if not found. */
+ private @Nullable Ignite initializedNode() {
+ assert !clusterNodes.isEmpty();
+
+ CompletableFuture<Ignite> nodeFut = clusterNodes.values().iterator().next();
+
+ if (!isNodeStarted(METASTORAGE_NODE)) {
+ assertThrowsWithCause(() -> nodeFut.get(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
+
+ // Assumed, there is no available Ignite instance in grid, which is required for running some checks.
+ clusterNodes.forEach((k, v) -> assertNull(v.getNow(null), k));
+
+ return null;
+ }
+
+ assertThat(nodeFut, willCompleteSuccessfully());
+
+ return nodeFut.join();
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeRestartTest.java
new file mode 100644
index 0000000000..831baf407c
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeRestartTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.micronaut.configuration.picocli.MicronautFactory;
+import io.micronaut.context.ApplicationContext;
+import io.micronaut.context.env.Environment;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.EnvironmentDefaultValueProvider;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.cli.commands.TopLevelCliCommand;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.hamcrest.text.IsEmptyString;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+import picocli.CommandLine;
+
+/**
+ * Test node start/stop in different scenarios and validate grid services behavior depending on availability/absence of quorums.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+public class ItNodeRestartTest extends AbstractClusterStartStopTest {
+ /** Initialize grid. */
+ @BeforeEach
+ public void before() throws Exception {
+ initGrid(nodeAliasToNameMapping.values());
+
+ stopAllNodes();
+ }
+
+
+ /** Stop grid and clear persistence. */
+ @AfterEach
+ public void afterEach() {
+ stopAllNodes();
+
+ for (String name : nodesCfg.keySet()) {
+ IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
+ }
+ }
+
+ /**
+ * Generate grid configuration to check.
+ *
+ * @return Test parameters.
+ */
+ static Object[] generateSequence() {
+ return new SequenceGenerator(
+ nodeAliasToNameMapping.keySet(),
+ (name, grid) -> (!"D2".equals(name) || grid.contains("D")), // Data nodes are interchangeable.
+ new UniqueSetFilter<String>().and(grid -> grid.size() > 2)
+ ).generate().toArray(Object[]::new);
+ }
+
+ /** Checks new node joining to the grid. */
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testNodeJoin(List<String> nodeNames) {
+ runTest(nodeNames, () -> checkNodeJoin(NEW_NODE));
+ }
+
+ /** Checks table creation. */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18328")
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testCreateTable(List<String> nodeNames) {
+ runTest(nodeNames, this::checkCreateTable);
+ }
+
+ /** Checks implicit transaction. */
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testImplicitTransaction(List<String> nodeNames) {
+ int key = UNIQ_INT.get();
+
+ runTest(nodeNames, () -> checkImplicitTx((node, tx) -> {
+ RecordView<Tuple> tupleRecordView = node.tables().tableAsync("tbl1")
+ .orTimeout(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ .join()
+ .recordView();
+
+ return tupleRecordView.insertAsync(tx, Tuple.create(Map.of("id", key, "val", key)))
+ .orTimeout(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ .join();
+ }));
+ }
+
+ /** Checks read-write transaction. */
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testReadWriteTransaction(List<String> nodeNames) {
+ int key = UNIQ_INT.get();
+
+ runTest(nodeNames, () -> checkTxRW((node, tx) -> {
+ RecordView<Tuple> tupleRecordView = node.tables().tableAsync("tbl1")
+ .orTimeout(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ .join()
+ .recordView();
+
+ return tupleRecordView.insertAsync(tx, Tuple.create(Map.of("id", key, "val", key)))
+ .orTimeout(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ .join();
+ }));
+ }
+
+ /** Checks read-only transaction. */
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testReadOnlyTransaction(List<String> nodeNames) {
+ runTest(nodeNames, () ->
+ checkTxRO((node, tx) -> node.tables().table("tbl1").keyValueView().getAsync(tx, Tuple.create(Map.of("id", 1)))
+ .orTimeout(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS)));
+ }
+
+ /** Checks implicit transaction. */
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testSqlWithImplicitTransaction(List<String> nodeNames) {
+ int key = UNIQ_INT.get();
+
+ runTest(nodeNames, () -> checkImplicitTx((node, tx) -> String.format("INSERT INTO tbl1 VALUES (%d, %d)", key, key)));
+ }
+
+ /** Checks read-write transaction. */
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testSqlWithReadWriteTransaction(List<String> nodeNames) {
+ int key = UNIQ_INT.get();
+
+ runTest(nodeNames, () -> checkTxRW((node, tx) -> String.format("INSERT INTO tbl1 VALUES (%d, %d)", key, key)));
+ }
+
+ /** Checks read-only transaction. */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18328")
+ @ParameterizedTest(name = "Grid=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER)
+ @MethodSource("generateSequence")
+ public void testSqlWithReadOnlyTransaction(List<String> nodeNames) {
+ runTest(nodeNames, () -> checkTxRO((node, tx) -> sql(node, tx, "SELECT * FROM tbl1")));
+ }
+
+ private void runTest(List<String> nodeNames, Runnable testBody) {
+ Set<String> realNames = nodeNames.stream().map(nodeAliasToNameMapping::get).collect(Collectors.toCollection(LinkedHashSet::new));
+
+ for (String name : nodeNames) {
+ try {
+ prestartGrid(realNames);
+
+ log.info("Restarting node: label=" + name + ", name=" + nodeAliasToNameMapping.get(name));
+
+ stopNode(nodeAliasToNameMapping.get(name));
+
+ testBody.run();
+
+ startNode(nodeAliasToNameMapping.get(name));
+
+ testBody.run();
+ } finally {
+ stopAllNodes();
+ }
+ }
+ }
+
+ private void prestartGrid(Set<String> nodeNames) {
+ Set<String> expectedNodes = Set.copyOf(nodeNames);
+
+ // Start CMG and MetaStorage first, to activate cluster.
+ List<CompletableFuture<Ignite>> futs = new ArrayList<>();
+ futs.add(startNode(nodeAliasToNameMapping.get("C")));
+ futs.add(startNode(nodeAliasToNameMapping.get("M")));
+
+ nodeNames.stream()
+ .filter(n -> !clusterNodes.containsKey(n))
+ .map(this::startNode)
+ .forEach(futs::add);
+
+ assertThat(CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new)), willCompleteSuccessfully());
+
+ // Stop unwanted nodes.
+ futs.stream()
+ .map(f -> f.join().name())
+ .filter(n -> !expectedNodes.contains(n))
+ .forEach(this::stopNode);
+ }
+
+ private void checkNodeJoin(String nodeName) {
+ try {
+ CompletableFuture<Ignite> fut = startNode(nodeName);
+
+ if (!isNodeStarted(CMG_NODE)) {
+ assertThrowsWithCause(() -> fut.get(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
+
+ assertTrue(topologyContainsNode("physical", nodeName));
+
+ // CMG holds logical topology state.
+ assertThrowsWithCause(() -> topologyContainsNode("logical", nodeName), IgniteException.class);
+
+ return;
+ } else if (!isNodeStarted(METASTORAGE_NODE)) {
+ // Node future can't complete as some components requires Metastorage on start.
+ assertThrowsWithCause(() -> fut.get(OPERATION_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class);
+
+ assertTrue(topologyContainsNode("physical", nodeName));
+ //TODO: Is Metastore required to promote node to logical topology?
+ assertFalse(topologyContainsNode("logical", nodeName));
+
+ return;
+ }
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ assertTrue(topologyContainsNode("physical", ((IgniteImpl) fut.join()).id()));
+ assertTrue(topologyContainsNode("logical", ((IgniteImpl) fut.join()).id()));
+ } finally {
+ IgnitionManager.stop(nodeName);
+ }
+ }
+
+ private void checkCreateTable() {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ String createTableCommand = "CREATE TABLE tempTbl (id INT PRIMARY KEY, val INT) WITH partitions = 1";
+ String dropTableCommand = "DROP TABLE IF EXISTS tempTbl";
+
+ try {
+ sql(node, null, createTableCommand);
+ } finally {
+ sql(node, null, dropTableCommand);
+ }
+ }
+
+ private void checkTxRO(BiFunction<Ignite, Transaction, Object> op) {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ if (!isNodeStarted(DATA_NODE) && !isNodeStarted(DATA_NODE_2)) {
+ Transaction roTx = node.transactions().readOnly().begin();
+
+ assertThrowsWithCause(() -> op.apply(node, roTx), IgniteException.class);
+
+ return;
+ } else if (!isNodeStarted(DATA_NODE_2)) {
+ // Fake transaction with a timestamp from the past.
+ Transaction tx0 = Mockito.spy(node.transactions().readOnly().begin());
+ Mockito.when(tx0.readTimestamp()).thenReturn(new HybridTimestamp(1L, 0));
+
+ try {
+ op.apply(node, tx0);
+
+ tx0.commit();
+ } catch (Throwable th) {
+ tx0.rollback();
+
+ throw th;
+ }
+
+ Transaction roTx = node.transactions().readOnly().begin();
+ // Transaction with recent timestamp.
+ assertThrowsWithCause(() -> op.apply(node, roTx), IgniteException.class);
+
+ return;
+ }
+
+ Transaction roTx = node.transactions().readOnly().begin();
+
+ try {
+ op.apply(node, roTx);
+
+ roTx.commit();
+ } catch (Throwable th) {
+ roTx.rollback();
+
+ throw th;
+ }
+ }
+
+ public void checkImplicitTx(BiFunction<Ignite, Transaction, Object> op) {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ // TODO: Bound table distribution zone to data nodes and uncomment.
+ // if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) {
+ if (!isNodeStarted(DATA_NODE) || (!isNodeStarted(CMG_NODE) && clusterNodes.get(DATA_NODE).getNow(null) == null)
+ || !isNodeStarted(METASTORAGE_NODE) ||(!isNodeStarted(CMG_NODE) && clusterNodes.get(METASTORAGE_NODE).getNow(null) == null)
+ ) {
+ assertThrowsWithCause(() -> op.apply(node, null), Exception.class);
+
+ return;
+ }
+
+ op.apply(node, null);
+ }
+
+ private void checkTxRW(BiFunction<Ignite, Transaction, Object> op) {
+ Ignite node = initializedNode();
+
+ if (node == null) {
+ return;
+ }
+
+ if (!isNodeStarted(DATA_NODE)) {
+ Transaction tx = node.transactions().begin();
+ try {
+ assertThrowsWithCause(() -> op.apply(node, tx), Exception.class);
+ } finally {
+ tx.rollback();
+ }
+
+ return;
+ }
+
+ Transaction tx = node.transactions().begin();
+ try {
+ op.apply(node, tx);
+
+ tx.commit();
+ } catch (Throwable th){
+ tx.rollback();
+
+ throw th;
+ }
+ }
+
+ private static boolean topologyContainsNode(String topologyType, String nodeId) {
+ StringWriter out = new StringWriter();
+ StringWriter err = new StringWriter();
+
+ new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST)))
+ .setDefaultValueProvider(new EnvironmentDefaultValueProvider())
+ .setOut(new PrintWriter(out, true))
+ .setErr(new PrintWriter(err, true))
+ .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL);
+
+ assertThat(err.toString(), IsEmptyString.emptyString());
+
+ return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find();
+ }
+
+ /** Find started cluster node or return {@code null} if not found. */
+ private @Nullable Ignite initializedNode() {
+ assert !clusterNodes.isEmpty();
+
+ return clusterNodes.values().stream()
+ .map(f -> f.getNow(null))
+ .filter(Objects::nonNull)
+ .findAny()
+ .orElse(null);
+ }
+
+
+ /** Filters out non-unique sets. */
+ private static class UniqueSetFilter<T> implements Predicate<Set<T>> {
+ final Set<Set<T>> seenBefore = new HashSet<>();
+
+ @Override
+ public boolean test(Set<T> s) {
+ return seenBefore.add(new HashSet<>(s)); // Copy mutable collection.
+ }
+ }
+}