You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/20 08:09:03 UTC

[GitHub] [ignite-3] rpuch opened a new pull request, #1461: IGNITE-18079 Integrate RAFT streaming snapshots

rpuch opened a new pull request, #1461:
URL: https://github.com/apache/ignite-3/pull/1461

   https://issues.apache.org/jira/browse/IGNITE-18079


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1057612945


##########
modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java:
##########
@@ -273,7 +274,7 @@ private static Object cfgValue(
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());

Review Comment:
   Without `lenient`, a test (unrelated to this PR) started failing. It happened because I had to add `@ExtendWith(ConfigurationExtension)`.
   
   The mock `RootKey` has to be lenient so that it does not fail a test in which any of the mocked method is not called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1059360308


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   Discussed privately, decided to leave it as is for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055408271


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   To the constructor of `MockNode`? Because it's not needed now. The only reason it's passed here is to avoid an NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058235691


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {

Review Comment:
   ```suggestion
       public @Nullable IgniteImpl node(int index) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058223559


##########
modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java:
##########
@@ -273,7 +274,7 @@ private static Object cfgValue(
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());

Review Comment:
   I added an `@ExtendWith(ConfigurationExtension.class)` for `org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorageTest` and nothing fell in my pr.
   
   Also add `@InjectConfiguration private TableConfiguration tableConfiguration;`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058228930


##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request (in milliseconds). */
+    @Value(hasDefault = true)
+    public int rpcInstallSnapshotTimeout = 5 * 60 * 1000;

Review Comment:
   Will this be enough time if the partitions are large enough, for example 50 GB?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058306819


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {

Review Comment:
   This method should never return `null`, added a check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058310661


##########
modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java:
##########
@@ -273,7 +274,7 @@ private static Object cfgValue(
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());

Review Comment:
   Take a look at `ItRaftGroupServiceTest`: it will fail if you remove `lenient()`. https://ci.ignite.apache.org/buildConfiguration/ApacheIgnite3xGradle_Test_RunAllTests/6982075?hideProblemsFromDependencies=false&hideTestsFromDependencies=false&pluginCoverage=true&expandCode+Inspection=true&expandBuildChangesSection=true&expandBuildTestsSection=true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058223559


##########
modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java:
##########
@@ -273,7 +274,7 @@ private static Object cfgValue(
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());

Review Comment:
   I added an `@ExtendWith(ConfigurationExtension.class)` for `org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorageTest` and nothing fell in my pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1057520306


##########
modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java:
##########
@@ -273,7 +274,7 @@ private static Object cfgValue(
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());

Review Comment:
   Why `lenient()` ?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {

Review Comment:
   I see that your code and `AbstractClusterIntegrationTest` are very similar, and you need to combine them, for example, put `Cluster` in a separate class and use it in these classes.



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -402,7 +419,8 @@ public void stop() {
 
     /**
      * Installs a predicate, it will be consulted with for each message being sent; when it returns {@code true}, the
-     * message will be silently dropped (it will not be sent, the corresponding future will never complete).
+     * message will be dropped (it will not be sent; the corresponding future will time out soon for {@code invoke()} methods

Review Comment:
   Why should behaviors be different for `sent` and `invoke`?



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -275,7 +280,19 @@ private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            inboundExecutor.execute(() -> onMessage(obj));
+            inboundExecutor.execute(() -> {
+                try {
+                    onMessage(obj);
+                } catch (Throwable e) {
+                    if (LOG.isInfoEnabled()) {
+                        LOG.info("onMessage() failed while processing {} from {}", e, obj.message(), obj.consistentId());

Review Comment:
   Why not at the error level?
   How do we remember to remove this before the release of the GE?



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   I don’t think it’s a problem to make two constructors, in one we pass the configuration, in the second we don’t pass it, but we mock it.



##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request (in milliseconds). */
+    @Value(hasDefault = true)
+    public int rpcInstallSnapshotTimeout = 5 * 60 * 1000;

Review Comment:
   Do I understand correctly this time to request and install a snapshot?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;

Review Comment:
   The thought came that we do not need to specify the storage at all, in this case, the default is suitable for us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058308093


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  \"raft\": {"
+            + "    \"rpcInstallSnapshotTimeout\": 10000"
+            + "  }"
+            + "}";
+
+    /**
+     * Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
+     * to a storage engine used.
+     */
+    private static final String DEFAULT_STORAGE_ENGINE = "<default>";

Review Comment:
   Discussed in private. Added a TODO to address a problem of forgetting a storage engine.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  \"raft\": {"
+            + "    \"rpcInstallSnapshotTimeout\": 10000"
+            + "  }"
+            + "}";
+
+    /**
+     * Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
+     * to a storage engine used.
+     */
+    private static final String DEFAULT_STORAGE_ENGINE = "<default>";
+
+    /**
+     * {@link NodeKnockout} that is used by tests that are indifferent for the knockout strategy being used.
+     */
+    private static final NodeKnockout DEFAULT_KNOCKOUT = NodeKnockout.PARTITION_NETWORK;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private void doInSession(int nodeIndex, Consumer<Session> action) {

Review Comment:
   Moved the methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058942511


##########
modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java:
##########
@@ -273,7 +274,7 @@ private static Object cfgValue(
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());

Review Comment:
   It reproduces easily on my machine in this branch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1059290852


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   I wonder why we don't use configuration extension here. It would require only a single value, with configuration as a string, for example



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);
+        ConfigurationValue<Integer> rpcInstallSnapshotTimeutValue = mock(ConfigurationValue.class);

Review Comment:
   Typo in `timeut`



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutNodeIndices = new ConcurrentHashSet<>();

Review Comment:
   I know I suck at grammar, but shouldn't it be a "knockedOutNode**s**Indices"



##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request (in milliseconds). */
+    @Value(hasDefault = true)
+    public int rpcInstallSnapshotTimeout = 5 * 60 * 1000;

Review Comment:
   5 minutes is definitely not long enough. What timeout do we use in Ignite 2.x?
   JRaft has been designed for very small state machines, in that case 5 minutes would have been reasonable. TODO is good, thank you!



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return nodes.get(index);
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {

Review Comment:
   Maybe aliveNode? We use this word quite often in Ignite 2



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutNodeIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return Objects.requireNonNull(nodes.get(index));
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {
+        return IntStream.range(0, nodes.size())
+                .filter(index -> nodes.get(index) != null)
+                .filter(index -> !knockedOutNodeIndices.contains(index))
+                .mapToObj(nodes::get)
+                .findAny()
+                .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+    }
+
+    /**
+     * Starts a new node with the given index.
+     *
+     * @param index Node index.
+     * @return Started node (if the cluster is already initialized, the node is returned when it joins the cluster; if it
+     *     is not initialized, the node is returned in a state in which it is ready to join the cluster).
+     */
+    public IgniteImpl startNode(int index) {
+        checkNodeIndex(index);
+
+        IgniteImpl newIgniteNode;
+
+        try {
+            newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        nodes.set(index, newIgniteNode);
+
+        return newIgniteNode;
+    }
+
+    private void checkNodeIndex(int index) {
+        if (index < 0) {
+            throw new IllegalArgumentException("Index cannot be negative");
+        }
+        if (index >= nodes.size()) {
+            throw new IllegalArgumentException("Cluster only contains " + nodes.size() + " nodes, but node with index "
+                    + index + " was tried to be accessed");
+        }
+    }
+
+    /**
+     * Stops a node by index.
+     *
+     * @param index Node index in the cluster.
+     */
+    public void stopNode(int index) {
+        checkNodeIndex(index);
+
+        IgnitionManager.stop(nodes.get(index).name());
+
+        nodes.set(index, null);
+    }
+
+    /**
+     * Restarts a node by index.
+     *
+     * @param index Node index in the cluster.
+     * @return New node.
+     */
+    public IgniteImpl restartNode(int index) {
+        stopNode(index);
+
+        return startNode(index);
+    }
+
+    /**
+     * Returns {@link RaftGroupService} that is the leader for the given table partition.
+     *
+     * @param tablePartitionId Table partition ID for which a leader is to be found.
+     * @return {@link RaftGroupService} that is the leader for the given table partition.
+     * @throws InterruptedException Thrown if interrupted while waiting for the leader to be found.
+     */
+    public RaftGroupService leaderServiceFor(TablePartitionId tablePartitionId) throws InterruptedException {
+        AtomicReference<RaftGroupService> serviceRef = new AtomicReference<>();
+
+        assertTrue(
+                waitForCondition(() -> {
+                    RaftGroupService service = currentLeaderServiceFor(tablePartitionId);
+
+                    serviceRef.set(service);
+
+                    return service != null;
+                }, 10_000),
+                "Did not find a leader for " + tablePartitionId + " in time"
+        );
+
+        RaftGroupService result = serviceRef.get();
+
+        assertNotNull(result);
+
+        return result;
+    }
+
+    @Nullable
+    private RaftGroupService currentLeaderServiceFor(TablePartitionId tablePartitionId) {
+        return aliveNodes()
+                .map(IgniteImpl.class::cast)
+                .map(ignite -> {
+                    JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server();
+
+                    Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream()
+                            .filter(nodeId -> nodeId.groupId().equals(tablePartitionId))
+                            .findAny();
+
+                    return maybeRaftNodeId.map(server::raftGroupService).orElse(null);
+                })
+                .filter(Objects::nonNull)

Review Comment:
   Or, just use "limit(1)" on the original stream and don't use Optional at all. `flatMap` is a cool thing!



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java:
##########
@@ -59,7 +61,13 @@ public void testLozaStop() throws Exception {
         Mockito.doReturn(mock(MessagingService.class)).when(clusterNetSvc).messagingService();
         Mockito.doReturn(mock(TopologyService.class)).when(clusterNetSvc).topologyService();
 
-        Loza loza = new Loza(clusterNetSvc, mock(RaftConfiguration.class), workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   Same question, maybe we should use configuration extension?



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -201,6 +201,11 @@ private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, Network
             return failedFuture(new NodeStoppingException());
         }
 
+        BiPredicate<String, NetworkMessage> dropMessage = dropMessagePredicate;

Review Comment:
   Is this a test-only functionality in the production code? What the hell is happening here =)
   Please add a big fat comment and maybe a TODO with issue to fix this.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"

Review Comment:
   I recommend removing quotes where possible. HOCON format allows it, and the code will look cleaner. The amount of lines can be reduced as well by joining some values, like `raft.rpcInstallSnapshotTimeout = 10000`, but it's up to you. I, personally, don't like when text occupies too many lines



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {

Review Comment:
   That's an interesting point. You're correct technically, but it doesn't feel right. I won't ask you to move classes though, that's really pointless for `runner`



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutNodeIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return Objects.requireNonNull(nodes.get(index));
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {
+        return IntStream.range(0, nodes.size())
+                .filter(index -> nodes.get(index) != null)
+                .filter(index -> !knockedOutNodeIndices.contains(index))
+                .mapToObj(nodes::get)
+                .findAny()
+                .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+    }
+
+    /**
+     * Starts a new node with the given index.
+     *
+     * @param index Node index.
+     * @return Started node (if the cluster is already initialized, the node is returned when it joins the cluster; if it
+     *     is not initialized, the node is returned in a state in which it is ready to join the cluster).
+     */
+    public IgniteImpl startNode(int index) {
+        checkNodeIndex(index);
+
+        IgniteImpl newIgniteNode;
+
+        try {
+            newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        nodes.set(index, newIgniteNode);
+
+        return newIgniteNode;
+    }
+
+    private void checkNodeIndex(int index) {
+        if (index < 0) {
+            throw new IllegalArgumentException("Index cannot be negative");
+        }
+        if (index >= nodes.size()) {
+            throw new IllegalArgumentException("Cluster only contains " + nodes.size() + " nodes, but node with index "
+                    + index + " was tried to be accessed");
+        }
+    }
+
+    /**
+     * Stops a node by index.
+     *
+     * @param index Node index in the cluster.
+     */
+    public void stopNode(int index) {
+        checkNodeIndex(index);
+
+        IgnitionManager.stop(nodes.get(index).name());
+
+        nodes.set(index, null);
+    }
+
+    /**
+     * Restarts a node by index.
+     *
+     * @param index Node index in the cluster.
+     * @return New node.
+     */
+    public IgniteImpl restartNode(int index) {
+        stopNode(index);
+
+        return startNode(index);
+    }
+
+    /**
+     * Returns {@link RaftGroupService} that is the leader for the given table partition.
+     *
+     * @param tablePartitionId Table partition ID for which a leader is to be found.
+     * @return {@link RaftGroupService} that is the leader for the given table partition.
+     * @throws InterruptedException Thrown if interrupted while waiting for the leader to be found.
+     */
+    public RaftGroupService leaderServiceFor(TablePartitionId tablePartitionId) throws InterruptedException {
+        AtomicReference<RaftGroupService> serviceRef = new AtomicReference<>();
+
+        assertTrue(
+                waitForCondition(() -> {
+                    RaftGroupService service = currentLeaderServiceFor(tablePartitionId);
+
+                    serviceRef.set(service);
+
+                    return service != null;
+                }, 10_000),
+                "Did not find a leader for " + tablePartitionId + " in time"
+        );
+
+        RaftGroupService result = serviceRef.get();
+
+        assertNotNull(result);
+
+        return result;
+    }
+
+    @Nullable
+    private RaftGroupService currentLeaderServiceFor(TablePartitionId tablePartitionId) {
+        return aliveNodes()
+                .map(IgniteImpl.class::cast)
+                .map(ignite -> {
+                    JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server();
+
+                    Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream()
+                            .filter(nodeId -> nodeId.groupId().equals(tablePartitionId))
+                            .findAny();
+
+                    return maybeRaftNodeId.map(server::raftGroupService).orElse(null);
+                })
+                .filter(Objects::nonNull)

Review Comment:
   You know, you could have used Optional#stream and flatMap, that way you wouldn't have to filter-out null values or use `orElse`. Maybe that's what you should do, but I don't insist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058235342


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */

Review Comment:
   For me it sounds like SQL indices, I think it can be reformulated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055446223


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;

Review Comment:
   There is a test that is run for all storages. But it does not seem reasonable to do this for all tests, as otherwise we'll have too many tests, but this will not add value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058220619


##########
modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java:
##########
@@ -273,7 +274,7 @@ private static Object cfgValue(
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());

Review Comment:
   I do not quite understand how tests can fall if methods are not called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058306169


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {

Review Comment:
   `testFixtures` contains classes that might be needed by tests in other modules. `Cluster` is defined in `runner`, and no module depends on it (and probably no module *can* depend on it, `runner` is designed to be on the top of the hierarchy). So there will be no tests in other modules that will need to use `testFixtures` from `runner`, so `testFixtures` is simply not needed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058239596


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return nodes.get(index);
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {
+        return IntStream.range(0, nodes.size())
+                .filter(index -> nodes.get(index) != null)
+                .filter(index -> !knockedOutIndices.contains(index))
+                .mapToObj(nodes::get)
+                .findAny()
+                .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+    }
+
+    /**
+     * Starts a new node with the given index.
+     *
+     * @param index Node index.
+     * @return Started node (if the cluster is already initialized, the node is returned when it joins the cluster; if it
+     *     is not initialized, the node is returned in a state in which it is ready to join the cluster).
+     */
+    public IgniteImpl startNode(int index) {
+        IgniteImpl newIgniteNode;
+
+        try {
+            newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        nodes.set(index, newIgniteNode);
+
+        return newIgniteNode;
+    }
+
+    /**
+     * Stops a node by index.
+     *
+     * @param index Node index in the cluster.
+     */
+    public void stopNode(int index) {

Review Comment:
   What happens if there is no node?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return nodes.get(index);
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {
+        return IntStream.range(0, nodes.size())
+                .filter(index -> nodes.get(index) != null)
+                .filter(index -> !knockedOutIndices.contains(index))
+                .mapToObj(nodes::get)
+                .findAny()
+                .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+    }
+
+    /**
+     * Starts a new node with the given index.
+     *
+     * @param index Node index.
+     * @return Started node (if the cluster is already initialized, the node is returned when it joins the cluster; if it
+     *     is not initialized, the node is returned in a state in which it is ready to join the cluster).
+     */
+    public IgniteImpl startNode(int index) {
+        IgniteImpl newIgniteNode;
+
+        try {
+            newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        nodes.set(index, newIgniteNode);
+
+        return newIgniteNode;
+    }
+
+    /**
+     * Stops a node by index.
+     *
+     * @param index Node index in the cluster.
+     */
+    public void stopNode(int index) {
+        IgnitionManager.stop(nodes.get(index).name());
+
+        nodes.set(index, null);
+    }
+
+    /**
+     * Restarts a node by index.
+     *
+     * @param index Node index in the cluster.
+     * @return New node.
+     */
+    public IgniteImpl restartNode(int index) {

Review Comment:
   What happens if there is no node?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058235342


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */

Review Comment:
   For me it sounds like SQL indices, I think it can be reformulated.
   Or is it better to use node IDs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058242275


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  \"raft\": {"
+            + "    \"rpcInstallSnapshotTimeout\": 10000"
+            + "  }"
+            + "}";
+
+    /**
+     * Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
+     * to a storage engine used.
+     */
+    private static final String DEFAULT_STORAGE_ENGINE = "<default>";
+
+    /**
+     * {@link NodeKnockout} that is used by tests that are indifferent for the knockout strategy being used.
+     */
+    private static final NodeKnockout DEFAULT_KNOCKOUT = NodeKnockout.PARTITION_NETWORK;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private void doInSession(int nodeIndex, Consumer<Session> action) {

Review Comment:
   I think it will be useful to move methods `doInSession` and `executeUpdate` and `query` to `Cluster`.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  \"raft\": {"
+            + "    \"rpcInstallSnapshotTimeout\": 10000"
+            + "  }"
+            + "}";
+
+    /**
+     * Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
+     * to a storage engine used.
+     */
+    private static final String DEFAULT_STORAGE_ENGINE = "<default>";
+
+    /**
+     * {@link NodeKnockout} that is used by tests that are indifferent for the knockout strategy being used.
+     */
+    private static final NodeKnockout DEFAULT_KNOCKOUT = NodeKnockout.PARTITION_NETWORK;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private void doInSession(int nodeIndex, Consumer<Session> action) {

Review Comment:
   I think it will be useful to move methods `doInSession`, `executeUpdate` and `query` to `Cluster`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058242275


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  \"raft\": {"
+            + "    \"rpcInstallSnapshotTimeout\": 10000"
+            + "  }"
+            + "}";
+
+    /**
+     * Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
+     * to a storage engine used.
+     */
+    private static final String DEFAULT_STORAGE_ENGINE = "<default>";
+
+    /**
+     * {@link NodeKnockout} that is used by tests that are indifferent for the knockout strategy being used.
+     */
+    private static final NodeKnockout DEFAULT_KNOCKOUT = NodeKnockout.PARTITION_NETWORK;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private void doInSession(int nodeIndex, Consumer<Session> action) {

Review Comment:
   I think it will be useful to move methods `doInSession` and executeUpdate' to `Cluster`.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  \"raft\": {"
+            + "    \"rpcInstallSnapshotTimeout\": 10000"
+            + "  }"
+            + "}";
+
+    /**
+     * Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
+     * to a storage engine used.
+     */
+    private static final String DEFAULT_STORAGE_ENGINE = "<default>";
+
+    /**
+     * {@link NodeKnockout} that is used by tests that are indifferent for the knockout strategy being used.
+     */
+    private static final NodeKnockout DEFAULT_KNOCKOUT = NodeKnockout.PARTITION_NETWORK;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private void doInSession(int nodeIndex, Consumer<Session> action) {

Review Comment:
   I think it will be useful to move methods `doInSession` and 'executeUpdate' to `Cluster`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058305306


##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request (in milliseconds). */
+    @Value(hasDefault = true)
+    public int rpcInstallSnapshotTimeout = 5 * 60 * 1000;

Review Comment:
   5 minutes is the JRaft default. Added a TODO to think about this default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055474328


##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -275,7 +280,13 @@ private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            inboundExecutor.execute(() -> onMessage(obj));
+            inboundExecutor.execute(() -> {
+                try {
+                    onMessage(obj);
+                } catch (RuntimeException e) {
+                    LOG.warn("onMessage() failed while processing " + obj.message() + " from " + obj.consistentId(), e);

Review Comment:
   Switched it to `{}` style, added a guard to only do this work when the corresponding logging level is enabled. Switched the level to INFO (because we need to see these messages, at least during the active development phase, this really helps in debugging faulty runs on TC).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1057606190


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   Currently, only 3 tests use `MockNode`. Neither of them needs to pass configuration, so only the constructor that does not accept the configuration and just mocks it is used. So if I add the second constructor (that accepts `RaftConfiguration`), this constructor will simply not be used at all. I think we should not add what we don't need yet. If we need it in the future, we'll add that constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1057614303


##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -402,7 +419,8 @@ public void stop() {
 
     /**
      * Installs a predicate, it will be consulted with for each message being sent; when it returns {@code true}, the
-     * message will be silently dropped (it will not be sent, the corresponding future will never complete).
+     * message will be dropped (it will not be sent; the corresponding future will time out soon for {@code invoke()} methods

Review Comment:
   'invoke' is different from 'send' in that it has `timeout` parameter, so, if the recipient never responds, a timeout happens sooner or later. For 'send', this is not the case: if the recipient never responds, the future just hangs forever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1059360044


##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -201,6 +201,11 @@ private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, Network
             return failedFuture(new NodeStoppingException());
         }
 
+        BiPredicate<String, NetworkMessage> dropMessage = dropMessagePredicate;

Review Comment:
   Added TODOs to remove/move this code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055446223


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;

Review Comment:
   There is a test that is run for all storages (`leaderFeedsFollowerWithSnapshot(String storageEngine)`). But it does not seem reasonable to do this for all tests, as otherwise we'll have too many tests, but this will not add value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058238954


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return nodes.get(index);
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {
+        return IntStream.range(0, nodes.size())
+                .filter(index -> nodes.get(index) != null)
+                .filter(index -> !knockedOutIndices.contains(index))
+                .mapToObj(nodes::get)
+                .findAny()
+                .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+    }
+
+    /**
+     * Starts a new node with the given index.
+     *
+     * @param index Node index.
+     * @return Started node (if the cluster is already initialized, the node is returned when it joins the cluster; if it
+     *     is not initialized, the node is returned in a state in which it is ready to join the cluster).
+     */
+    public IgniteImpl startNode(int index) {
+        IgniteImpl newIgniteNode;
+
+        try {
+            newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        nodes.set(index, newIgniteNode);

Review Comment:
   If memory serves me right, then if the list is smaller than the index, then there will be an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058241190


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":{},\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  \"raft\": {"
+            + "    \"rpcInstallSnapshotTimeout\": 10000"
+            + "  }"
+            + "}";
+
+    /**
+     * Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
+     * to a storage engine used.
+     */
+    private static final String DEFAULT_STORAGE_ENGINE = "<default>";

Review Comment:
   We do not need to specify the default engine, it is already applied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058939590


##########
modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java:
##########
@@ -273,7 +274,7 @@ private static Object cfgValue(
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());

Review Comment:
   It turns out interesting, locally I can not reproduce.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1057637423


##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -275,7 +280,19 @@ private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            inboundExecutor.execute(() -> onMessage(obj));
+            inboundExecutor.execute(() -> {
+                try {
+                    onMessage(obj);
+                } catch (Throwable e) {
+                    if (LOG.isInfoEnabled()) {
+                        LOG.info("onMessage() failed while processing {} from {}", e, obj.message(), obj.consistentId());

Review Comment:
   Changed this to `error()` as this is actually an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1059351763


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return nodes.get(index);
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {

Review Comment:
   Renamed to `aliveNode()`; also, renamed `aliveNodes()` to `runningNodes()` to avoid confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov merged pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
ibessonov merged PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055313572


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   Why not pass it to the constructor?



##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request. */

Review Comment:
   Please indicate what time it is: seconds, milliseconds, nanoseconds or something else?



##########
modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java:
##########
@@ -1074,9 +1075,16 @@ private MetaStorageService prepareMetaStorage() throws Exception {
     private CompletableFuture<RaftGroupService> startRaftService(
             ClusterService node, PeersAndLearners configuration
     ) throws NodeStoppingException {
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   Why not use `@InjectConfiguration`?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java:
##########
@@ -188,10 +188,12 @@ void test() throws Exception {
         assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
         assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
 
-        verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
+        // TODO: IGNITE-18022 - uncomment the following line or remove it if not needed after the rework

Review Comment:
   I think I confused you and you need to specify **IGNITE-18030**



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private Path nodeWorkDir(String nodeName) {

Review Comment:
   It is used in one place, I think we should not take it out into a separate method.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java:
##########
@@ -78,15 +78,18 @@ public TxStateStorage txStatePartitionStorage() {
     public CompletableFuture<MvPartitionStorage> reCreateMvPartitionStorage() throws StorageException {
         assert mvTableStorage.getMvPartition(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-        return mvTableStorage.destroyPartition(partId())
+        // TODO: IGNITE-18022 - actually recreate or do in a different way
+        //return mvTableStorage.destroyPartition(partId())
+        return CompletableFuture.completedFuture(null)
                 .thenApply(unused -> mvTableStorage.getOrCreateMvPartition(partId()));
     }
 
     @Override
     public TxStateStorage reCreateTxStatePartitionStorage() throws StorageException {
         assert txStateTableStorage.getTxStateStorage(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-        txStateTableStorage.destroyTxStateStorage(partId());
+        // TODO: IGNITE-18022 - actually recreate or do in a different way

Review Comment:
   I think I confused you and you need to specify **IGNITE-18030**



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -275,7 +280,13 @@ private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            inboundExecutor.execute(() -> onMessage(obj));
+            inboundExecutor.execute(() -> {
+                try {
+                    onMessage(obj);
+                } catch (RuntimeException e) {

Review Comment:
   Let's catch class `Throwable`.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private Path nodeWorkDir(String nodeName) {
+        return workDir.resolve(nodeName);
+    }
+
+    @Test
+    void snapshotReadOnRestartWorksCorrectly() {
+        cluster.startAndInit(1);
+
+        doInSession(session -> {
+            executeUpdate("create table test (key int primary key, value varchar(20)) with partitions=1, replicas=1", session);
+
+            executeUpdate("insert into test(key, value) values (1, 'one')", session);
+        });
+
+        cluster.restartNode(0);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    private void doInSession(Consumer<Session> action) {
+        try (Session session = cluster.openSession()) {
+            action.accept(session);
+        }
+    }
+
+    private <T> T doInSession(Function<Session, T> action) {
+        try (Session session = cluster.openSession()) {
+            return action.apply(session);
+        }
+    }
+
+    private static void executeUpdate(String sql, Session session) {
+        executeUpdate(sql, session, null);
+    }
+
+    private static void executeUpdate(String sql, Session session, @Nullable Transaction transaction) {
+        try (ResultSet ignored = session.execute(transaction, sql)) {
+            // Do nothing, just adhere to the syntactic ceremony...
+        }
+    }
+
+    private static <T> T withRetry(Supplier<T> action) {
+        // TODO: IGNITE-18423 remove this retry machinery when the networking bug is fixed as replication timeout seems to be caused by it.
+
+        int maxAttempts = 3;
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            try {
+                return action.get();
+            } catch (RuntimeException e) {
+                if (attempt < maxAttempts && isTransientFailure(e)) {
+                    LOG.warn("Attempt " + attempt + " failed, going to retry", e);
+                } else {
+                    throw e;
+                }
+            }
+
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                fail("Interrupted while waiting for next attempt");
+            }
+        }
+
+        throw new AssertionError("Should not reach here");
+    }
+
+    private static boolean isTransientFailure(RuntimeException e) {
+        return IgniteTestUtils.hasCause(e, ReplicationTimeoutException.class, null)
+                || IgniteTestUtils.hasCause(e, IgniteInternalException.class, "Failed to send message to node");
+    }
+
+    private <T> T query(String sql, Function<ResultSet, T> extractor) {
+        return doInSession(session -> {
+            try (ResultSet resultSet = session.execute(null, sql)) {
+                return extractor.apply(resultSet);
+            }
+        });
+    }
+
+    private <T> T queryWithRetry(String sql, Function<ResultSet, T> extractor) {
+        return withRetry(() -> query(sql, extractor));
+    }
+
+    private static List<IgniteBiTuple<Integer, String>> readRows(ResultSet rs) {
+        List<IgniteBiTuple<Integer, String>> rows = new ArrayList<>();
+
+        while (rs.hasNext()) {
+            SqlRow sqlRow = rs.next();
+
+            rows.add(new IgniteBiTuple<>(sqlRow.intValue(0), sqlRow.stringValue(1)));
+        }
+
+        return rows;
+    }
+
+    @Test
+    @Disabled("Enable when the IGNITE-18170 deadlock is fixed")
+    void leaderFeedsFollowerWithSnapshotWithKnockoutStop() throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.STOP, DEFAULT_STORAGE_ENGINE);
+    }
+
+    @Test
+    void leaderFeedsFollowerWithSnapshotWithKnockoutPartitionNetwork() throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.PARTITION_NETWORK, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void testLeaderFeedsFollowerWithSnapshot(NodeKnockout knockout, String storageEngine) throws Exception {
+        feedNode2WithSnapshotOfOneRow(knockout, storageEngine);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout) throws InterruptedException {
+        feedNode2WithSnapshotOfOneRow(knockout, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout, String storageEngine) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, storageEngine);
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+    }
+
+    private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout, String storageEngine) throws InterruptedException {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(storageEngine);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        cluster.knockOutNode(2, knockout);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (1, 'one')", session);
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+    }
+
+    private void createTestTableWith3Replicas(String storageEngine) throws InterruptedException {
+        String sql = "create table test (key int primary key, value varchar(20)) engine " + storageEngine
+                + " with partitions=1, replicas=3";
+
+        doInSession(session -> {
+            executeUpdate(sql, session);
+        });
+
+        waitForTableToStart();
+    }
+
+    private void waitForTableToStart() throws InterruptedException {
+        // TODO: IGNITE-18203 - remove this waiting because when a table creation query is executed, the table must be fully ready.
+
+        BooleanSupplier tableStarted = () -> {
+            int numberOfStartedRaftNodes = cluster.aliveNodes()
+                    .map(ItTableRaftSnapshotsTest::tablePartitionIds)
+                    .mapToInt(List::size)
+                    .sum();
+            return numberOfStartedRaftNodes == 3;
+        };
+
+        assertTrue(waitForCondition(tableStarted, 10_000), "Did not see all table RAFT nodes started");
+    }
+
+    private void causeLogTruncationOnSolePartitionLeader() throws InterruptedException {
+        // Doing this twice because first snapshot creation does not trigger log truncation.
+        doSnapshotOnSolePartitionLeader();
+        doSnapshotOnSolePartitionLeader();
+    }
+
+    private void doSnapshotOnSolePartitionLeader() throws InterruptedException {
+        TablePartitionId tablePartitionId = solePartitionId();
+
+        doSnapshotOn(tablePartitionId);
+    }
+
+    private TablePartitionId solePartitionId() {
+        List<TablePartitionId> tablePartitionIds = tablePartitionIds(cluster.entryNode());
+
+        assertThat(tablePartitionIds.size(), is(1));
+
+        return tablePartitionIds.get(0);
+    }
+
+    private static List<TablePartitionId> tablePartitionIds(IgniteImpl node) {
+        return node.raftManager().localNodes().stream()
+                .map(RaftNodeId::groupId)
+                .filter(TablePartitionId.class::isInstance)
+                .map(TablePartitionId.class::cast)
+                .collect(toList());
+    }
+
+    private void doSnapshotOn(TablePartitionId tablePartitionId) throws InterruptedException {
+        RaftGroupService raftGroupService = cluster.leaderServiceFor(tablePartitionId);
+
+        CountDownLatch snapshotLatch = new CountDownLatch(1);
+        AtomicReference<Status> snapshotStatus = new AtomicReference<>();
+
+        raftGroupService.getRaftNode().snapshot(status -> {
+            snapshotStatus.set(status);
+            snapshotLatch.countDown();
+        });
+
+        assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), "Snapshot was not finished in time");
+
+        assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + snapshotStatus.get());
+    }
+
+    private void reanimateNode2AndWaitForSnapshotInstalled(NodeKnockout knockout) throws InterruptedException {
+        reanimateNodeAndWaitForSnapshotInstalled(2, knockout);
+    }
+
+    private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex, NodeKnockout knockout) throws InterruptedException {
+        CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
+
+        Logger replicatorLogger = Logger.getLogger(Replicator.class.getName());
+
+        var handler = new NoOpHandler() {
+            @Override
+            public void publish(LogRecord record) {
+                if (record.getMessage().matches("Node .+ received InstallSnapshotResponse from .+_" + nodeIndex + " .+ success=true")) {
+                    snapshotInstalledLatch.countDown();
+                }
+            }
+        };
+
+        replicatorLogger.addHandler(handler);
+
+        try {
+            cluster.reanimateNode(nodeIndex, knockout);
+
+            assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did not install a snapshot in time");
+        } finally {
+            replicatorLogger.removeHandler(handler);
+        }
+    }
+
+    private void transferLeadershipOnSolePartitionTo(int nodeIndex) throws InterruptedException {
+        String nodeConsistentId = cluster.node(nodeIndex).node().name();
+
+        int maxAttempts = 3;
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            boolean transferred = tryTransferLeadershipOnSolePartitionTo(nodeConsistentId);
+
+            if (transferred) {
+                break;
+            }
+
+            if (attempt < maxAttempts) {
+                LOG.info("Did not transfer leadership after " + attempt + " attempts, going to retry...");
+            } else {
+                fail("Did not transfer leadership in time after " + maxAttempts + " attempts");
+            }
+        }
+    }
+
+    private boolean tryTransferLeadershipOnSolePartitionTo(String targetLeaderConsistentId) throws InterruptedException {
+        NodeImpl leaderBeforeTransfer = (NodeImpl) cluster.leaderServiceFor(solePartitionId()).getRaftNode();
+
+        initiateLeadershipTransferTo(targetLeaderConsistentId, leaderBeforeTransfer);
+
+        BooleanSupplier leaderTransferred = () -> {
+            PeerId leaderId = leaderBeforeTransfer.getLeaderId();
+            return leaderId != null && leaderId.getConsistentId().equals(targetLeaderConsistentId);
+        };
+
+        return waitForCondition(leaderTransferred, 10_000);
+    }
+
+    private static void initiateLeadershipTransferTo(String targetLeaderConsistentId, NodeImpl leaderBeforeTransfer) {
+        long startedMillis = System.currentTimeMillis();
+
+        while (true) {
+            Status status = leaderBeforeTransfer.transferLeadershipTo(new PeerId(targetLeaderConsistentId));
+
+            if (status.getRaftError() != RaftError.EBUSY) {
+                break;
+            }
+
+            if (System.currentTimeMillis() - startedMillis > 10_000) {
+                throw new IllegalStateException("Could not initiate leadership transfer to " + targetLeaderConsistentId + " in time");
+            }
+        }
+    }
+
+    @Test
+    @Disabled("Enable when the IGNITE-18170 deadlock is resolved")
+    void txSemanticsIsMaintainedWithKnockoutStop() throws Exception {
+        txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout.STOP);
+    }
+
+    @Test
+    void txSemanticsIsMaintainedWithKnockoutPartitionNetwork() throws Exception {
+        txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout.PARTITION_NETWORK);
+    }
+
+    private void txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout knockout) throws Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        Transaction tx = cluster.entryNode().transactions().begin();
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (1, 'one')", session, tx);
+
+            cluster.knockOutNode(2, knockout);
+
+            tx.commit();
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            RocksDbStorageEngine.ENGINE_NAME,
+            PersistentPageMemoryStorageEngine.ENGINE_NAME,
+            VolatilePageMemoryStorageEngine.ENGINE_NAME
+    })
+    void leaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.DEFAULT, storageEngine);
+    }
+
+    @Test
+    @Disabled("Enable when IGNITE-18432 is fixed")
+    void entriesKeepAddendedAfterSnapshotInstallation() throws Exception {
+        feedNode2WithSnapshotOfOneRow(NodeKnockout.DEFAULT);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (2, 'two')", session);
+        });
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+    }
+
+    @Test
+    // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed
+    @Disabled("IGNITE-18423")
+    void entriesKeepAddendedDuringSnapshotInstallation() throws Exception {
+        NodeKnockout knockout = NodeKnockout.DEFAULT;
+
+        prepareClusterForInstallingSnapshotToNode2(knockout);
+
+        AtomicBoolean installedSnapshot = new AtomicBoolean(false);
+        AtomicInteger lastLoadedKey = new AtomicInteger();
+
+        CompletableFuture<?> loadingFuture = IgniteTestUtils.runAsync(() -> {
+            for (int i = 2; !installedSnapshot.get(); i++) {
+                int key = i;
+                doInSession(session -> {
+                    executeUpdate("insert into test(key, value) values (" + key + ", 'extra')", session);
+                    lastLoadedKey.set(key);
+                });
+            }
+        });
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+
+        installedSnapshot.set(true);
+
+        assertThat(loadingFuture, willSucceedIn(30, TimeUnit.SECONDS));
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<Integer> keys = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows)
+                .stream().map(IgniteBiTuple::get1).collect(toList());
+
+        assertThat(keys, equalTo(IntStream.rangeClosed(1, lastLoadedKey.get()).boxed().collect(toList())));
+    }
+
+    @Test
+    // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed
+    @Disabled("IGNITE-18423")
+    void nodeCanInstallSnapshotsAfterSnapshotInstalledToIt() throws Exception {
+        feedNode2WithSnapshotOfOneRow(NodeKnockout.DEFAULT);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        cluster.knockOutNode(0, NodeKnockout.DEFAULT);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (2, 'two')", session);
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+
+        reanimateNodeAndWaitForSnapshotInstalled(0, NodeKnockout.DEFAULT);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+    }
+
+    private class Cluster {
+        /** Base port number. */
+        private static final int BASE_PORT = 3344;
+
+        /**
+         * Nodes bootstrap configuration pattern.
+         *
+         * <p>rpcIntallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+         * allow tests pass thanks to retries.
+         */
+        private static final String NODE_BOOTSTRAP_CFG = "{\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"
+                + "    }\n"
+                + "  },\n"
+                + "  \"raft\": {"
+                + "    \"rpcInstallSnapshotTimeout\": 10000"
+                + "  }"
+                + "}";
+
+        private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+        private final TestInfo testInfo;
+
+        /** Cluster nodes. */
+        private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+        private volatile boolean started = false;
+
+        private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+        private Cluster(TestInfo testInfo) {
+            this.testInfo = testInfo;
+        }
+
+        void startAndInit(int nodeCount) {
+            if (started) {
+                throw new IllegalStateException("The cluster is already started");
+            }
+
+            List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                    .mapToObj(this::startClusterNode)
+                    .collect(toList());
+
+            String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+            IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+            for (CompletableFuture<IgniteImpl> future : futures) {
+                assertThat(future, willCompleteSuccessfully());
+
+                nodes.add(future.join());
+            }
+
+            started = true;
+        }
+
+        private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+            String nodeName = testNodeName(testInfo, nodeIndex);
+
+            String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+            return IgnitionManager.start(nodeName, config, nodeWorkDir(nodeName))
+                    .thenApply(IgniteImpl.class::cast);
+        }
+
+        IgniteImpl node(int index) {
+            return nodes.get(index);
+        }
+
+        /**
+         * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+         */
+        IgniteImpl entryNode() {
+            return IntStream.range(0, nodes.size())
+                    .filter(index -> nodes.get(index) != null)
+                    .filter(index -> !knockedOutIndices.contains(index))
+                    .mapToObj(nodes::get)
+                    .findAny()
+                    .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+        }
+
+        void stopNode(int index) {
+            IgnitionManager.stop(nodes.get(index).name());
+
+            nodes.set(index, null);
+        }
+
+        void restartNode(int index) {
+            stopNode(index);
+
+            startNode(index);
+        }
+
+        void startNode(int index) {
+            IgniteImpl newIgniteNode;
+
+            try {
+                newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new RuntimeException(e);
+            } catch (ExecutionException | TimeoutException e) {
+                throw new RuntimeException(e);
+            }
+
+            nodes.set(index, newIgniteNode);
+        }
+
+        RaftGroupService leaderServiceFor(TablePartitionId tablePartitionId) throws InterruptedException {
+            AtomicReference<RaftGroupService> serviceRef = new AtomicReference<>();
+
+            assertTrue(
+                    waitForCondition(() -> {
+                        RaftGroupService service = currentLeaderServiceFor(tablePartitionId);
+
+                        serviceRef.set(service);
+
+                        return service != null;
+                    }, 10_000),
+                    "Did not find a leader for " + tablePartitionId + " in time"
+            );
+
+            RaftGroupService result = serviceRef.get();
+
+            assertNotNull(result);
+
+            return result;
+        }
+
+        @Nullable
+        private RaftGroupService currentLeaderServiceFor(TablePartitionId tablePartitionId) {
+            return aliveNodes()
+                    .map(IgniteImpl.class::cast)
+                    .map(ignite -> {
+                        JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server();
+
+                        Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream()
+                                .filter(nodeId -> nodeId.groupId().equals(tablePartitionId))
+                                .findAny();
+
+                        return maybeRaftNodeId.map(server::raftGroupService).orElse(null);
+                    })
+                    .filter(Objects::nonNull)
+                    .filter(service -> service.getRaftNode().isLeader())
+                    .findAny()
+                    .orElse(null);
+        }
+
+        private Stream<IgniteImpl> aliveNodes() {
+            return nodes.stream().filter(Objects::nonNull);
+        }
+
+        private Session openSession() {
+            return entryNode().sql()
+                    .sessionBuilder()
+                    .defaultSchema("PUBLIC")
+                    .defaultQueryTimeout(QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                    .build();
+        }
+
+        void shutdown() {
+            aliveNodes().forEach(node -> IgnitionManager.stop(node.name()));
+        }
+
+        private void knockOutNode(int nodeIndex, NodeKnockout knockout) {
+            knockout.knockOutNode(nodeIndex, this);
+
+            knockedOutIndices.add(nodeIndex);
+        }
+
+        private void reanimateNode(int nodeIndex, NodeKnockout knockout) {
+            knockout.reanimateNode(nodeIndex, this);
+
+            knockedOutIndices.remove(nodeIndex);
+        }
+    }
+
+    /**
+     * A way to make a node be separated from a cluster and stop receiving updates.
+     */
+    private enum NodeKnockout {

Review Comment:
   Why enum and not just methods and functions?



##########
modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java:
##########
@@ -84,6 +86,11 @@ public class ItRaftGroupServiceTest extends IgniteAbstractTest {
 
     @BeforeEach
     public void setUp(TestInfo testInfo) {
+        ConfigurationValue<Integer> rpcInstallSnapshotTimeutValue = mock(ConfigurationValue.class);

Review Comment:
   Why no use `@InjectConfiguration`?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java:
##########
@@ -188,10 +188,12 @@ void test() throws Exception {
         assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
         assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
 
-        verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
+        // TODO: IGNITE-18022 - uncomment the following line or remove it if not needed after the rework
+        //verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
         verify(incomingMvTableStorage, times(2)).getOrCreateMvPartition(eq(TEST_PARTITION));
 
-        verify(incomingTxStateTableStorage, times(1)).destroyTxStateStorage(eq(TEST_PARTITION));
+        // TODO: IGNITE-18022 - uncomment the following line or remove it if not needed after the rework

Review Comment:
   I think I confused you and you need to specify **IGNITE-18030**



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -201,6 +201,11 @@ private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, Network
             return failedFuture(new NodeStoppingException());
         }
 
+        BiPredicate<String, NetworkMessage> dropMessage = dropMessagePredicate;
+        if (dropMessage != null && dropMessage.test(recipient.name(), msg)) {
+            return new CompletableFuture<NetworkMessage>().orTimeout(10, TimeUnit.MILLISECONDS);

Review Comment:
   From the documentation of `org.apache.ignite.network.DefaultMessagingService#dropMessages`, it says that the future will not be completed, but here it turns out it will be completed.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java:
##########
@@ -78,15 +78,18 @@ public TxStateStorage txStatePartitionStorage() {
     public CompletableFuture<MvPartitionStorage> reCreateMvPartitionStorage() throws StorageException {
         assert mvTableStorage.getMvPartition(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-        return mvTableStorage.destroyPartition(partId())
+        // TODO: IGNITE-18022 - actually recreate or do in a different way

Review Comment:
   I think I confused you and you need to specify **IGNITE-18030**



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;

Review Comment:
   I think it's worth testing all storages.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {

Review Comment:
   Why do't use `org.apache.ignite.internal.AbstractClusterIntegrationTest` ?
   
   There is no documentation for methods and tests and it is quite clear to me what is being tested and what we expect, please add documentation.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private Path nodeWorkDir(String nodeName) {
+        return workDir.resolve(nodeName);
+    }
+
+    @Test
+    void snapshotReadOnRestartWorksCorrectly() {

Review Comment:
   Don't quite understand what is being tested here? more like local recovery.



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -275,7 +280,13 @@ private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            inboundExecutor.execute(() -> onMessage(obj));
+            inboundExecutor.execute(() -> {
+                try {
+                    onMessage(obj);
+                } catch (RuntimeException e) {
+                    LOG.warn("onMessage() failed while processing " + obj.message() + " from " + obj.consistentId(), e);

Review Comment:
   Let's log on debug level with `Supplier` or `IgniteStringFormatter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055648873


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {

Review Comment:
   `AbstractClusterIntegrationTest` is more rigid, it starts and inits the cluster in a `@BeforeEach` method. Here, we need more control on when and how many nodes are started.
   
   Added javadocs, please let me know if something else should be added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1057637109


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {

Review Comment:
   I made `Cluster` a top-level class and prepared it for being used by `AbstractClusterIntegrationTest`, but migrating the test (and its subclasses) to use `Cluster` requires some work that is out of scope of this PR, so I created a ticket https://issues.apache.org/jira/browse/IGNITE-18465 and mentioned it in TODOs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058233695


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {

Review Comment:
   I think this class should be in `testFixtures`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058264078


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return nodes.get(index);
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {

Review Comment:
   Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058237659


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return nodes.get(index);
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {

Review Comment:
   Not quite the right name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058228930


##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request (in milliseconds). */
+    @Value(hasDefault = true)
+    public int rpcInstallSnapshotTimeout = 5 * 60 * 1000;

Review Comment:
   Will this be enough time if the partitions are large enough, for example 50 GB?
   Why I specify, because I think it will not be obvious to the user to configure this parameter, especially in flight.
   I say this from experience with 2.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1058306650


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */

Review Comment:
   Renamed the field to make it more explicit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1057638279


##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request (in milliseconds). */
+    @Value(hasDefault = true)
+    public int rpcInstallSnapshotTimeout = 5 * 60 * 1000;

Review Comment:
   Make a request, then a snapshot gets installed, then the leader gets a response to this request. This timeout is for the whole duration of the round-trip, including transportation and installation of a snapshot. I expanded the javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055475563


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private Path nodeWorkDir(String nodeName) {
+        return workDir.resolve(nodeName);
+    }
+
+    @Test
+    void snapshotReadOnRestartWorksCorrectly() {

Review Comment:
   JRaft 'reads' a snapshot on start, this method tested that this 'read' happens successfully. But you are right, this is also tested elsewhere. Removed the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

Posted by GitBox <gi...@apache.org>.
rpuch commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055447761


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private Path nodeWorkDir(String nodeName) {
+        return workDir.resolve(nodeName);
+    }
+
+    @Test
+    void snapshotReadOnRestartWorksCorrectly() {
+        cluster.startAndInit(1);
+
+        doInSession(session -> {
+            executeUpdate("create table test (key int primary key, value varchar(20)) with partitions=1, replicas=1", session);
+
+            executeUpdate("insert into test(key, value) values (1, 'one')", session);
+        });
+
+        cluster.restartNode(0);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    private void doInSession(Consumer<Session> action) {
+        try (Session session = cluster.openSession()) {
+            action.accept(session);
+        }
+    }
+
+    private <T> T doInSession(Function<Session, T> action) {
+        try (Session session = cluster.openSession()) {
+            return action.apply(session);
+        }
+    }
+
+    private static void executeUpdate(String sql, Session session) {
+        executeUpdate(sql, session, null);
+    }
+
+    private static void executeUpdate(String sql, Session session, @Nullable Transaction transaction) {
+        try (ResultSet ignored = session.execute(transaction, sql)) {
+            // Do nothing, just adhere to the syntactic ceremony...
+        }
+    }
+
+    private static <T> T withRetry(Supplier<T> action) {
+        // TODO: IGNITE-18423 remove this retry machinery when the networking bug is fixed as replication timeout seems to be caused by it.
+
+        int maxAttempts = 3;
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            try {
+                return action.get();
+            } catch (RuntimeException e) {
+                if (attempt < maxAttempts && isTransientFailure(e)) {
+                    LOG.warn("Attempt " + attempt + " failed, going to retry", e);
+                } else {
+                    throw e;
+                }
+            }
+
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                fail("Interrupted while waiting for next attempt");
+            }
+        }
+
+        throw new AssertionError("Should not reach here");
+    }
+
+    private static boolean isTransientFailure(RuntimeException e) {
+        return IgniteTestUtils.hasCause(e, ReplicationTimeoutException.class, null)
+                || IgniteTestUtils.hasCause(e, IgniteInternalException.class, "Failed to send message to node");
+    }
+
+    private <T> T query(String sql, Function<ResultSet, T> extractor) {
+        return doInSession(session -> {
+            try (ResultSet resultSet = session.execute(null, sql)) {
+                return extractor.apply(resultSet);
+            }
+        });
+    }
+
+    private <T> T queryWithRetry(String sql, Function<ResultSet, T> extractor) {
+        return withRetry(() -> query(sql, extractor));
+    }
+
+    private static List<IgniteBiTuple<Integer, String>> readRows(ResultSet rs) {
+        List<IgniteBiTuple<Integer, String>> rows = new ArrayList<>();
+
+        while (rs.hasNext()) {
+            SqlRow sqlRow = rs.next();
+
+            rows.add(new IgniteBiTuple<>(sqlRow.intValue(0), sqlRow.stringValue(1)));
+        }
+
+        return rows;
+    }
+
+    @Test
+    @Disabled("Enable when the IGNITE-18170 deadlock is fixed")
+    void leaderFeedsFollowerWithSnapshotWithKnockoutStop() throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.STOP, DEFAULT_STORAGE_ENGINE);
+    }
+
+    @Test
+    void leaderFeedsFollowerWithSnapshotWithKnockoutPartitionNetwork() throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.PARTITION_NETWORK, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void testLeaderFeedsFollowerWithSnapshot(NodeKnockout knockout, String storageEngine) throws Exception {
+        feedNode2WithSnapshotOfOneRow(knockout, storageEngine);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout) throws InterruptedException {
+        feedNode2WithSnapshotOfOneRow(knockout, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout, String storageEngine) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, storageEngine);
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+    }
+
+    private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout, String storageEngine) throws InterruptedException {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(storageEngine);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        cluster.knockOutNode(2, knockout);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (1, 'one')", session);
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+    }
+
+    private void createTestTableWith3Replicas(String storageEngine) throws InterruptedException {
+        String sql = "create table test (key int primary key, value varchar(20)) engine " + storageEngine
+                + " with partitions=1, replicas=3";
+
+        doInSession(session -> {
+            executeUpdate(sql, session);
+        });
+
+        waitForTableToStart();
+    }
+
+    private void waitForTableToStart() throws InterruptedException {
+        // TODO: IGNITE-18203 - remove this waiting because when a table creation query is executed, the table must be fully ready.
+
+        BooleanSupplier tableStarted = () -> {
+            int numberOfStartedRaftNodes = cluster.aliveNodes()
+                    .map(ItTableRaftSnapshotsTest::tablePartitionIds)
+                    .mapToInt(List::size)
+                    .sum();
+            return numberOfStartedRaftNodes == 3;
+        };
+
+        assertTrue(waitForCondition(tableStarted, 10_000), "Did not see all table RAFT nodes started");
+    }
+
+    private void causeLogTruncationOnSolePartitionLeader() throws InterruptedException {
+        // Doing this twice because first snapshot creation does not trigger log truncation.
+        doSnapshotOnSolePartitionLeader();
+        doSnapshotOnSolePartitionLeader();
+    }
+
+    private void doSnapshotOnSolePartitionLeader() throws InterruptedException {
+        TablePartitionId tablePartitionId = solePartitionId();
+
+        doSnapshotOn(tablePartitionId);
+    }
+
+    private TablePartitionId solePartitionId() {
+        List<TablePartitionId> tablePartitionIds = tablePartitionIds(cluster.entryNode());
+
+        assertThat(tablePartitionIds.size(), is(1));
+
+        return tablePartitionIds.get(0);
+    }
+
+    private static List<TablePartitionId> tablePartitionIds(IgniteImpl node) {
+        return node.raftManager().localNodes().stream()
+                .map(RaftNodeId::groupId)
+                .filter(TablePartitionId.class::isInstance)
+                .map(TablePartitionId.class::cast)
+                .collect(toList());
+    }
+
+    private void doSnapshotOn(TablePartitionId tablePartitionId) throws InterruptedException {
+        RaftGroupService raftGroupService = cluster.leaderServiceFor(tablePartitionId);
+
+        CountDownLatch snapshotLatch = new CountDownLatch(1);
+        AtomicReference<Status> snapshotStatus = new AtomicReference<>();
+
+        raftGroupService.getRaftNode().snapshot(status -> {
+            snapshotStatus.set(status);
+            snapshotLatch.countDown();
+        });
+
+        assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), "Snapshot was not finished in time");
+
+        assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + snapshotStatus.get());
+    }
+
+    private void reanimateNode2AndWaitForSnapshotInstalled(NodeKnockout knockout) throws InterruptedException {
+        reanimateNodeAndWaitForSnapshotInstalled(2, knockout);
+    }
+
+    private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex, NodeKnockout knockout) throws InterruptedException {
+        CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
+
+        Logger replicatorLogger = Logger.getLogger(Replicator.class.getName());
+
+        var handler = new NoOpHandler() {
+            @Override
+            public void publish(LogRecord record) {
+                if (record.getMessage().matches("Node .+ received InstallSnapshotResponse from .+_" + nodeIndex + " .+ success=true")) {
+                    snapshotInstalledLatch.countDown();
+                }
+            }
+        };
+
+        replicatorLogger.addHandler(handler);
+
+        try {
+            cluster.reanimateNode(nodeIndex, knockout);
+
+            assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did not install a snapshot in time");
+        } finally {
+            replicatorLogger.removeHandler(handler);
+        }
+    }
+
+    private void transferLeadershipOnSolePartitionTo(int nodeIndex) throws InterruptedException {
+        String nodeConsistentId = cluster.node(nodeIndex).node().name();
+
+        int maxAttempts = 3;
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            boolean transferred = tryTransferLeadershipOnSolePartitionTo(nodeConsistentId);
+
+            if (transferred) {
+                break;
+            }
+
+            if (attempt < maxAttempts) {
+                LOG.info("Did not transfer leadership after " + attempt + " attempts, going to retry...");
+            } else {
+                fail("Did not transfer leadership in time after " + maxAttempts + " attempts");
+            }
+        }
+    }
+
+    private boolean tryTransferLeadershipOnSolePartitionTo(String targetLeaderConsistentId) throws InterruptedException {
+        NodeImpl leaderBeforeTransfer = (NodeImpl) cluster.leaderServiceFor(solePartitionId()).getRaftNode();
+
+        initiateLeadershipTransferTo(targetLeaderConsistentId, leaderBeforeTransfer);
+
+        BooleanSupplier leaderTransferred = () -> {
+            PeerId leaderId = leaderBeforeTransfer.getLeaderId();
+            return leaderId != null && leaderId.getConsistentId().equals(targetLeaderConsistentId);
+        };
+
+        return waitForCondition(leaderTransferred, 10_000);
+    }
+
+    private static void initiateLeadershipTransferTo(String targetLeaderConsistentId, NodeImpl leaderBeforeTransfer) {
+        long startedMillis = System.currentTimeMillis();
+
+        while (true) {
+            Status status = leaderBeforeTransfer.transferLeadershipTo(new PeerId(targetLeaderConsistentId));
+
+            if (status.getRaftError() != RaftError.EBUSY) {
+                break;
+            }
+
+            if (System.currentTimeMillis() - startedMillis > 10_000) {
+                throw new IllegalStateException("Could not initiate leadership transfer to " + targetLeaderConsistentId + " in time");
+            }
+        }
+    }
+
+    @Test
+    @Disabled("Enable when the IGNITE-18170 deadlock is resolved")
+    void txSemanticsIsMaintainedWithKnockoutStop() throws Exception {
+        txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout.STOP);
+    }
+
+    @Test
+    void txSemanticsIsMaintainedWithKnockoutPartitionNetwork() throws Exception {
+        txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout.PARTITION_NETWORK);
+    }
+
+    private void txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout knockout) throws Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        Transaction tx = cluster.entryNode().transactions().begin();
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (1, 'one')", session, tx);
+
+            cluster.knockOutNode(2, knockout);
+
+            tx.commit();
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            RocksDbStorageEngine.ENGINE_NAME,
+            PersistentPageMemoryStorageEngine.ENGINE_NAME,
+            VolatilePageMemoryStorageEngine.ENGINE_NAME
+    })
+    void leaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.DEFAULT, storageEngine);
+    }
+
+    @Test
+    @Disabled("Enable when IGNITE-18432 is fixed")
+    void entriesKeepAddendedAfterSnapshotInstallation() throws Exception {
+        feedNode2WithSnapshotOfOneRow(NodeKnockout.DEFAULT);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (2, 'two')", session);
+        });
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+    }
+
+    @Test
+    // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed
+    @Disabled("IGNITE-18423")
+    void entriesKeepAddendedDuringSnapshotInstallation() throws Exception {
+        NodeKnockout knockout = NodeKnockout.DEFAULT;
+
+        prepareClusterForInstallingSnapshotToNode2(knockout);
+
+        AtomicBoolean installedSnapshot = new AtomicBoolean(false);
+        AtomicInteger lastLoadedKey = new AtomicInteger();
+
+        CompletableFuture<?> loadingFuture = IgniteTestUtils.runAsync(() -> {
+            for (int i = 2; !installedSnapshot.get(); i++) {
+                int key = i;
+                doInSession(session -> {
+                    executeUpdate("insert into test(key, value) values (" + key + ", 'extra')", session);
+                    lastLoadedKey.set(key);
+                });
+            }
+        });
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+
+        installedSnapshot.set(true);
+
+        assertThat(loadingFuture, willSucceedIn(30, TimeUnit.SECONDS));
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<Integer> keys = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows)
+                .stream().map(IgniteBiTuple::get1).collect(toList());
+
+        assertThat(keys, equalTo(IntStream.rangeClosed(1, lastLoadedKey.get()).boxed().collect(toList())));
+    }
+
+    @Test
+    // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed
+    @Disabled("IGNITE-18423")
+    void nodeCanInstallSnapshotsAfterSnapshotInstalledToIt() throws Exception {
+        feedNode2WithSnapshotOfOneRow(NodeKnockout.DEFAULT);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        cluster.knockOutNode(0, NodeKnockout.DEFAULT);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (2, 'two')", session);
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+
+        reanimateNodeAndWaitForSnapshotInstalled(0, NodeKnockout.DEFAULT);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+    }
+
+    private class Cluster {
+        /** Base port number. */
+        private static final int BASE_PORT = 3344;
+
+        /**
+         * Nodes bootstrap configuration pattern.
+         *
+         * <p>rpcIntallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+         * allow tests pass thanks to retries.
+         */
+        private static final String NODE_BOOTSTRAP_CFG = "{\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"
+                + "    }\n"
+                + "  },\n"
+                + "  \"raft\": {"
+                + "    \"rpcInstallSnapshotTimeout\": 10000"
+                + "  }"
+                + "}";
+
+        private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+        private final TestInfo testInfo;
+
+        /** Cluster nodes. */
+        private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+        private volatile boolean started = false;
+
+        private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+        private Cluster(TestInfo testInfo) {
+            this.testInfo = testInfo;
+        }
+
+        void startAndInit(int nodeCount) {
+            if (started) {
+                throw new IllegalStateException("The cluster is already started");
+            }
+
+            List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                    .mapToObj(this::startClusterNode)
+                    .collect(toList());
+
+            String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+            IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+            for (CompletableFuture<IgniteImpl> future : futures) {
+                assertThat(future, willCompleteSuccessfully());
+
+                nodes.add(future.join());
+            }
+
+            started = true;
+        }
+
+        private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+            String nodeName = testNodeName(testInfo, nodeIndex);
+
+            String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+            return IgnitionManager.start(nodeName, config, nodeWorkDir(nodeName))
+                    .thenApply(IgniteImpl.class::cast);
+        }
+
+        IgniteImpl node(int index) {
+            return nodes.get(index);
+        }
+
+        /**
+         * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+         */
+        IgniteImpl entryNode() {
+            return IntStream.range(0, nodes.size())
+                    .filter(index -> nodes.get(index) != null)
+                    .filter(index -> !knockedOutIndices.contains(index))
+                    .mapToObj(nodes::get)
+                    .findAny()
+                    .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+        }
+
+        void stopNode(int index) {
+            IgnitionManager.stop(nodes.get(index).name());
+
+            nodes.set(index, null);
+        }
+
+        void restartNode(int index) {
+            stopNode(index);
+
+            startNode(index);
+        }
+
+        void startNode(int index) {
+            IgniteImpl newIgniteNode;
+
+            try {
+                newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new RuntimeException(e);
+            } catch (ExecutionException | TimeoutException e) {
+                throw new RuntimeException(e);
+            }
+
+            nodes.set(index, newIgniteNode);
+        }
+
+        RaftGroupService leaderServiceFor(TablePartitionId tablePartitionId) throws InterruptedException {
+            AtomicReference<RaftGroupService> serviceRef = new AtomicReference<>();
+
+            assertTrue(
+                    waitForCondition(() -> {
+                        RaftGroupService service = currentLeaderServiceFor(tablePartitionId);
+
+                        serviceRef.set(service);
+
+                        return service != null;
+                    }, 10_000),
+                    "Did not find a leader for " + tablePartitionId + " in time"
+            );
+
+            RaftGroupService result = serviceRef.get();
+
+            assertNotNull(result);
+
+            return result;
+        }
+
+        @Nullable
+        private RaftGroupService currentLeaderServiceFor(TablePartitionId tablePartitionId) {
+            return aliveNodes()
+                    .map(IgniteImpl.class::cast)
+                    .map(ignite -> {
+                        JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server();
+
+                        Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream()
+                                .filter(nodeId -> nodeId.groupId().equals(tablePartitionId))
+                                .findAny();
+
+                        return maybeRaftNodeId.map(server::raftGroupService).orElse(null);
+                    })
+                    .filter(Objects::nonNull)
+                    .filter(service -> service.getRaftNode().isLeader())
+                    .findAny()
+                    .orElse(null);
+        }
+
+        private Stream<IgniteImpl> aliveNodes() {
+            return nodes.stream().filter(Objects::nonNull);
+        }
+
+        private Session openSession() {
+            return entryNode().sql()
+                    .sessionBuilder()
+                    .defaultSchema("PUBLIC")
+                    .defaultQueryTimeout(QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                    .build();
+        }
+
+        void shutdown() {
+            aliveNodes().forEach(node -> IgnitionManager.stop(node.name()));
+        }
+
+        private void knockOutNode(int nodeIndex, NodeKnockout knockout) {
+            knockout.knockOutNode(nodeIndex, this);
+
+            knockedOutIndices.add(nodeIndex);
+        }
+
+        private void reanimateNode(int nodeIndex, NodeKnockout knockout) {
+            knockout.reanimateNode(nodeIndex, this);
+
+            knockedOutIndices.remove(nodeIndex);
+        }
+    }
+
+    /**
+     * A way to make a node be separated from a cluster and stop receiving updates.
+     */
+    private enum NodeKnockout {

Review Comment:
   To make it usable with `@ParameterizedTest` via `@EnumSource`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org