You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/22 11:09:06 UTC

[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

tkalkirill commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1055313572


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   Why not pass it to the constructor?



##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request. */

Review Comment:
   Please indicate what time it is: seconds, milliseconds, nanoseconds or something else?



##########
modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java:
##########
@@ -1074,9 +1075,16 @@ private MetaStorageService prepareMetaStorage() throws Exception {
     private CompletableFuture<RaftGroupService> startRaftService(
             ClusterService node, PeersAndLearners configuration
     ) throws NodeStoppingException {
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   Why not use `@InjectConfiguration`?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java:
##########
@@ -188,10 +188,12 @@ void test() throws Exception {
         assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
         assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
 
-        verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
+        // TODO: IGNITE-18022 - uncomment the following line or remove it if not needed after the rework

Review Comment:
   I think I confused you and you need to specify **IGNITE-18030**



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private Path nodeWorkDir(String nodeName) {

Review Comment:
   It is used in one place, I think we should not take it out into a separate method.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java:
##########
@@ -78,15 +78,18 @@ public TxStateStorage txStatePartitionStorage() {
     public CompletableFuture<MvPartitionStorage> reCreateMvPartitionStorage() throws StorageException {
         assert mvTableStorage.getMvPartition(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-        return mvTableStorage.destroyPartition(partId())
+        // TODO: IGNITE-18022 - actually recreate or do in a different way
+        //return mvTableStorage.destroyPartition(partId())
+        return CompletableFuture.completedFuture(null)
                 .thenApply(unused -> mvTableStorage.getOrCreateMvPartition(partId()));
     }
 
     @Override
     public TxStateStorage reCreateTxStatePartitionStorage() throws StorageException {
         assert txStateTableStorage.getTxStateStorage(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-        txStateTableStorage.destroyTxStateStorage(partId());
+        // TODO: IGNITE-18022 - actually recreate or do in a different way

Review Comment:
   I think I confused you and you need to specify **IGNITE-18030**



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -275,7 +280,13 @@ private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            inboundExecutor.execute(() -> onMessage(obj));
+            inboundExecutor.execute(() -> {
+                try {
+                    onMessage(obj);
+                } catch (RuntimeException e) {

Review Comment:
   Let's catch class `Throwable`.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private Path nodeWorkDir(String nodeName) {
+        return workDir.resolve(nodeName);
+    }
+
+    @Test
+    void snapshotReadOnRestartWorksCorrectly() {
+        cluster.startAndInit(1);
+
+        doInSession(session -> {
+            executeUpdate("create table test (key int primary key, value varchar(20)) with partitions=1, replicas=1", session);
+
+            executeUpdate("insert into test(key, value) values (1, 'one')", session);
+        });
+
+        cluster.restartNode(0);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    private void doInSession(Consumer<Session> action) {
+        try (Session session = cluster.openSession()) {
+            action.accept(session);
+        }
+    }
+
+    private <T> T doInSession(Function<Session, T> action) {
+        try (Session session = cluster.openSession()) {
+            return action.apply(session);
+        }
+    }
+
+    private static void executeUpdate(String sql, Session session) {
+        executeUpdate(sql, session, null);
+    }
+
+    private static void executeUpdate(String sql, Session session, @Nullable Transaction transaction) {
+        try (ResultSet ignored = session.execute(transaction, sql)) {
+            // Do nothing, just adhere to the syntactic ceremony...
+        }
+    }
+
+    private static <T> T withRetry(Supplier<T> action) {
+        // TODO: IGNITE-18423 remove this retry machinery when the networking bug is fixed as replication timeout seems to be caused by it.
+
+        int maxAttempts = 3;
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            try {
+                return action.get();
+            } catch (RuntimeException e) {
+                if (attempt < maxAttempts && isTransientFailure(e)) {
+                    LOG.warn("Attempt " + attempt + " failed, going to retry", e);
+                } else {
+                    throw e;
+                }
+            }
+
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                fail("Interrupted while waiting for next attempt");
+            }
+        }
+
+        throw new AssertionError("Should not reach here");
+    }
+
+    private static boolean isTransientFailure(RuntimeException e) {
+        return IgniteTestUtils.hasCause(e, ReplicationTimeoutException.class, null)
+                || IgniteTestUtils.hasCause(e, IgniteInternalException.class, "Failed to send message to node");
+    }
+
+    private <T> T query(String sql, Function<ResultSet, T> extractor) {
+        return doInSession(session -> {
+            try (ResultSet resultSet = session.execute(null, sql)) {
+                return extractor.apply(resultSet);
+            }
+        });
+    }
+
+    private <T> T queryWithRetry(String sql, Function<ResultSet, T> extractor) {
+        return withRetry(() -> query(sql, extractor));
+    }
+
+    private static List<IgniteBiTuple<Integer, String>> readRows(ResultSet rs) {
+        List<IgniteBiTuple<Integer, String>> rows = new ArrayList<>();
+
+        while (rs.hasNext()) {
+            SqlRow sqlRow = rs.next();
+
+            rows.add(new IgniteBiTuple<>(sqlRow.intValue(0), sqlRow.stringValue(1)));
+        }
+
+        return rows;
+    }
+
+    @Test
+    @Disabled("Enable when the IGNITE-18170 deadlock is fixed")
+    void leaderFeedsFollowerWithSnapshotWithKnockoutStop() throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.STOP, DEFAULT_STORAGE_ENGINE);
+    }
+
+    @Test
+    void leaderFeedsFollowerWithSnapshotWithKnockoutPartitionNetwork() throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.PARTITION_NETWORK, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void testLeaderFeedsFollowerWithSnapshot(NodeKnockout knockout, String storageEngine) throws Exception {
+        feedNode2WithSnapshotOfOneRow(knockout, storageEngine);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout) throws InterruptedException {
+        feedNode2WithSnapshotOfOneRow(knockout, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout, String storageEngine) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, storageEngine);
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+    }
+
+    private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout, String storageEngine) throws InterruptedException {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(storageEngine);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        cluster.knockOutNode(2, knockout);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (1, 'one')", session);
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+    }
+
+    private void createTestTableWith3Replicas(String storageEngine) throws InterruptedException {
+        String sql = "create table test (key int primary key, value varchar(20)) engine " + storageEngine
+                + " with partitions=1, replicas=3";
+
+        doInSession(session -> {
+            executeUpdate(sql, session);
+        });
+
+        waitForTableToStart();
+    }
+
+    private void waitForTableToStart() throws InterruptedException {
+        // TODO: IGNITE-18203 - remove this waiting because when a table creation query is executed, the table must be fully ready.
+
+        BooleanSupplier tableStarted = () -> {
+            int numberOfStartedRaftNodes = cluster.aliveNodes()
+                    .map(ItTableRaftSnapshotsTest::tablePartitionIds)
+                    .mapToInt(List::size)
+                    .sum();
+            return numberOfStartedRaftNodes == 3;
+        };
+
+        assertTrue(waitForCondition(tableStarted, 10_000), "Did not see all table RAFT nodes started");
+    }
+
+    private void causeLogTruncationOnSolePartitionLeader() throws InterruptedException {
+        // Doing this twice because first snapshot creation does not trigger log truncation.
+        doSnapshotOnSolePartitionLeader();
+        doSnapshotOnSolePartitionLeader();
+    }
+
+    private void doSnapshotOnSolePartitionLeader() throws InterruptedException {
+        TablePartitionId tablePartitionId = solePartitionId();
+
+        doSnapshotOn(tablePartitionId);
+    }
+
+    private TablePartitionId solePartitionId() {
+        List<TablePartitionId> tablePartitionIds = tablePartitionIds(cluster.entryNode());
+
+        assertThat(tablePartitionIds.size(), is(1));
+
+        return tablePartitionIds.get(0);
+    }
+
+    private static List<TablePartitionId> tablePartitionIds(IgniteImpl node) {
+        return node.raftManager().localNodes().stream()
+                .map(RaftNodeId::groupId)
+                .filter(TablePartitionId.class::isInstance)
+                .map(TablePartitionId.class::cast)
+                .collect(toList());
+    }
+
+    private void doSnapshotOn(TablePartitionId tablePartitionId) throws InterruptedException {
+        RaftGroupService raftGroupService = cluster.leaderServiceFor(tablePartitionId);
+
+        CountDownLatch snapshotLatch = new CountDownLatch(1);
+        AtomicReference<Status> snapshotStatus = new AtomicReference<>();
+
+        raftGroupService.getRaftNode().snapshot(status -> {
+            snapshotStatus.set(status);
+            snapshotLatch.countDown();
+        });
+
+        assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), "Snapshot was not finished in time");
+
+        assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + snapshotStatus.get());
+    }
+
+    private void reanimateNode2AndWaitForSnapshotInstalled(NodeKnockout knockout) throws InterruptedException {
+        reanimateNodeAndWaitForSnapshotInstalled(2, knockout);
+    }
+
+    private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex, NodeKnockout knockout) throws InterruptedException {
+        CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
+
+        Logger replicatorLogger = Logger.getLogger(Replicator.class.getName());
+
+        var handler = new NoOpHandler() {
+            @Override
+            public void publish(LogRecord record) {
+                if (record.getMessage().matches("Node .+ received InstallSnapshotResponse from .+_" + nodeIndex + " .+ success=true")) {
+                    snapshotInstalledLatch.countDown();
+                }
+            }
+        };
+
+        replicatorLogger.addHandler(handler);
+
+        try {
+            cluster.reanimateNode(nodeIndex, knockout);
+
+            assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did not install a snapshot in time");
+        } finally {
+            replicatorLogger.removeHandler(handler);
+        }
+    }
+
+    private void transferLeadershipOnSolePartitionTo(int nodeIndex) throws InterruptedException {
+        String nodeConsistentId = cluster.node(nodeIndex).node().name();
+
+        int maxAttempts = 3;
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            boolean transferred = tryTransferLeadershipOnSolePartitionTo(nodeConsistentId);
+
+            if (transferred) {
+                break;
+            }
+
+            if (attempt < maxAttempts) {
+                LOG.info("Did not transfer leadership after " + attempt + " attempts, going to retry...");
+            } else {
+                fail("Did not transfer leadership in time after " + maxAttempts + " attempts");
+            }
+        }
+    }
+
+    private boolean tryTransferLeadershipOnSolePartitionTo(String targetLeaderConsistentId) throws InterruptedException {
+        NodeImpl leaderBeforeTransfer = (NodeImpl) cluster.leaderServiceFor(solePartitionId()).getRaftNode();
+
+        initiateLeadershipTransferTo(targetLeaderConsistentId, leaderBeforeTransfer);
+
+        BooleanSupplier leaderTransferred = () -> {
+            PeerId leaderId = leaderBeforeTransfer.getLeaderId();
+            return leaderId != null && leaderId.getConsistentId().equals(targetLeaderConsistentId);
+        };
+
+        return waitForCondition(leaderTransferred, 10_000);
+    }
+
+    private static void initiateLeadershipTransferTo(String targetLeaderConsistentId, NodeImpl leaderBeforeTransfer) {
+        long startedMillis = System.currentTimeMillis();
+
+        while (true) {
+            Status status = leaderBeforeTransfer.transferLeadershipTo(new PeerId(targetLeaderConsistentId));
+
+            if (status.getRaftError() != RaftError.EBUSY) {
+                break;
+            }
+
+            if (System.currentTimeMillis() - startedMillis > 10_000) {
+                throw new IllegalStateException("Could not initiate leadership transfer to " + targetLeaderConsistentId + " in time");
+            }
+        }
+    }
+
+    @Test
+    @Disabled("Enable when the IGNITE-18170 deadlock is resolved")
+    void txSemanticsIsMaintainedWithKnockoutStop() throws Exception {
+        txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout.STOP);
+    }
+
+    @Test
+    void txSemanticsIsMaintainedWithKnockoutPartitionNetwork() throws Exception {
+        txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout.PARTITION_NETWORK);
+    }
+
+    private void txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout knockout) throws Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        Transaction tx = cluster.entryNode().transactions().begin();
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (1, 'one')", session, tx);
+
+            cluster.knockOutNode(2, knockout);
+
+            tx.commit();
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            RocksDbStorageEngine.ENGINE_NAME,
+            PersistentPageMemoryStorageEngine.ENGINE_NAME,
+            VolatilePageMemoryStorageEngine.ENGINE_NAME
+    })
+    void leaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(NodeKnockout.DEFAULT, storageEngine);
+    }
+
+    @Test
+    @Disabled("Enable when IGNITE-18432 is fixed")
+    void entriesKeepAddendedAfterSnapshotInstallation() throws Exception {
+        feedNode2WithSnapshotOfOneRow(NodeKnockout.DEFAULT);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (2, 'two')", session);
+        });
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+    }
+
+    @Test
+    // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed
+    @Disabled("IGNITE-18423")
+    void entriesKeepAddendedDuringSnapshotInstallation() throws Exception {
+        NodeKnockout knockout = NodeKnockout.DEFAULT;
+
+        prepareClusterForInstallingSnapshotToNode2(knockout);
+
+        AtomicBoolean installedSnapshot = new AtomicBoolean(false);
+        AtomicInteger lastLoadedKey = new AtomicInteger();
+
+        CompletableFuture<?> loadingFuture = IgniteTestUtils.runAsync(() -> {
+            for (int i = 2; !installedSnapshot.get(); i++) {
+                int key = i;
+                doInSession(session -> {
+                    executeUpdate("insert into test(key, value) values (" + key + ", 'extra')", session);
+                    lastLoadedKey.set(key);
+                });
+            }
+        });
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+
+        installedSnapshot.set(true);
+
+        assertThat(loadingFuture, willSucceedIn(30, TimeUnit.SECONDS));
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<Integer> keys = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows)
+                .stream().map(IgniteBiTuple::get1).collect(toList());
+
+        assertThat(keys, equalTo(IntStream.rangeClosed(1, lastLoadedKey.get()).boxed().collect(toList())));
+    }
+
+    @Test
+    // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed
+    @Disabled("IGNITE-18423")
+    void nodeCanInstallSnapshotsAfterSnapshotInstalledToIt() throws Exception {
+        feedNode2WithSnapshotOfOneRow(NodeKnockout.DEFAULT);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        cluster.knockOutNode(0, NodeKnockout.DEFAULT);
+
+        doInSession(session -> {
+            executeUpdate("insert into test(key, value) values (2, 'two')", session);
+        });
+
+        causeLogTruncationOnSolePartitionLeader();
+
+        reanimateNodeAndWaitForSnapshotInstalled(0, NodeKnockout.DEFAULT);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry("select * from test order by key", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+    }
+
+    private class Cluster {
+        /** Base port number. */
+        private static final int BASE_PORT = 3344;
+
+        /**
+         * Nodes bootstrap configuration pattern.
+         *
+         * <p>rpcIntallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+         * allow tests pass thanks to retries.
+         */
+        private static final String NODE_BOOTSTRAP_CFG = "{\n"
+                + "  \"network\": {\n"
+                + "    \"port\":{},\n"
+                + "    \"nodeFinder\":{\n"
+                + "      \"netClusterNodes\": [ {} ]\n"
+                + "    }\n"
+                + "  },\n"
+                + "  \"raft\": {"
+                + "    \"rpcInstallSnapshotTimeout\": 10000"
+                + "  }"
+                + "}";
+
+        private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+        private final TestInfo testInfo;
+
+        /** Cluster nodes. */
+        private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+        private volatile boolean started = false;
+
+        private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+        private Cluster(TestInfo testInfo) {
+            this.testInfo = testInfo;
+        }
+
+        void startAndInit(int nodeCount) {
+            if (started) {
+                throw new IllegalStateException("The cluster is already started");
+            }
+
+            List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                    .mapToObj(this::startClusterNode)
+                    .collect(toList());
+
+            String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+            IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+            for (CompletableFuture<IgniteImpl> future : futures) {
+                assertThat(future, willCompleteSuccessfully());
+
+                nodes.add(future.join());
+            }
+
+            started = true;
+        }
+
+        private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+            String nodeName = testNodeName(testInfo, nodeIndex);
+
+            String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+            return IgnitionManager.start(nodeName, config, nodeWorkDir(nodeName))
+                    .thenApply(IgniteImpl.class::cast);
+        }
+
+        IgniteImpl node(int index) {
+            return nodes.get(index);
+        }
+
+        /**
+         * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+         */
+        IgniteImpl entryNode() {
+            return IntStream.range(0, nodes.size())
+                    .filter(index -> nodes.get(index) != null)
+                    .filter(index -> !knockedOutIndices.contains(index))
+                    .mapToObj(nodes::get)
+                    .findAny()
+                    .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+        }
+
+        void stopNode(int index) {
+            IgnitionManager.stop(nodes.get(index).name());
+
+            nodes.set(index, null);
+        }
+
+        void restartNode(int index) {
+            stopNode(index);
+
+            startNode(index);
+        }
+
+        void startNode(int index) {
+            IgniteImpl newIgniteNode;
+
+            try {
+                newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new RuntimeException(e);
+            } catch (ExecutionException | TimeoutException e) {
+                throw new RuntimeException(e);
+            }
+
+            nodes.set(index, newIgniteNode);
+        }
+
+        RaftGroupService leaderServiceFor(TablePartitionId tablePartitionId) throws InterruptedException {
+            AtomicReference<RaftGroupService> serviceRef = new AtomicReference<>();
+
+            assertTrue(
+                    waitForCondition(() -> {
+                        RaftGroupService service = currentLeaderServiceFor(tablePartitionId);
+
+                        serviceRef.set(service);
+
+                        return service != null;
+                    }, 10_000),
+                    "Did not find a leader for " + tablePartitionId + " in time"
+            );
+
+            RaftGroupService result = serviceRef.get();
+
+            assertNotNull(result);
+
+            return result;
+        }
+
+        @Nullable
+        private RaftGroupService currentLeaderServiceFor(TablePartitionId tablePartitionId) {
+            return aliveNodes()
+                    .map(IgniteImpl.class::cast)
+                    .map(ignite -> {
+                        JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server();
+
+                        Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream()
+                                .filter(nodeId -> nodeId.groupId().equals(tablePartitionId))
+                                .findAny();
+
+                        return maybeRaftNodeId.map(server::raftGroupService).orElse(null);
+                    })
+                    .filter(Objects::nonNull)
+                    .filter(service -> service.getRaftNode().isLeader())
+                    .findAny()
+                    .orElse(null);
+        }
+
+        private Stream<IgniteImpl> aliveNodes() {
+            return nodes.stream().filter(Objects::nonNull);
+        }
+
+        private Session openSession() {
+            return entryNode().sql()
+                    .sessionBuilder()
+                    .defaultSchema("PUBLIC")
+                    .defaultQueryTimeout(QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                    .build();
+        }
+
+        void shutdown() {
+            aliveNodes().forEach(node -> IgnitionManager.stop(node.name()));
+        }
+
+        private void knockOutNode(int nodeIndex, NodeKnockout knockout) {
+            knockout.knockOutNode(nodeIndex, this);
+
+            knockedOutIndices.add(nodeIndex);
+        }
+
+        private void reanimateNode(int nodeIndex, NodeKnockout knockout) {
+            knockout.reanimateNode(nodeIndex, this);
+
+            knockedOutIndices.remove(nodeIndex);
+        }
+    }
+
+    /**
+     * A way to make a node be separated from a cluster and stop receiving updates.
+     */
+    private enum NodeKnockout {

Review Comment:
   Why enum and not just methods and functions?



##########
modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java:
##########
@@ -84,6 +86,11 @@ public class ItRaftGroupServiceTest extends IgniteAbstractTest {
 
     @BeforeEach
     public void setUp(TestInfo testInfo) {
+        ConfigurationValue<Integer> rpcInstallSnapshotTimeutValue = mock(ConfigurationValue.class);

Review Comment:
   Why no use `@InjectConfiguration`?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java:
##########
@@ -188,10 +188,12 @@ void test() throws Exception {
         assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
         assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
 
-        verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
+        // TODO: IGNITE-18022 - uncomment the following line or remove it if not needed after the rework
+        //verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
         verify(incomingMvTableStorage, times(2)).getOrCreateMvPartition(eq(TEST_PARTITION));
 
-        verify(incomingTxStateTableStorage, times(1)).destroyTxStateStorage(eq(TEST_PARTITION));
+        // TODO: IGNITE-18022 - uncomment the following line or remove it if not needed after the rework

Review Comment:
   I think I confused you and you need to specify **IGNITE-18030**



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -201,6 +201,11 @@ private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, Network
             return failedFuture(new NodeStoppingException());
         }
 
+        BiPredicate<String, NetworkMessage> dropMessage = dropMessagePredicate;
+        if (dropMessage != null && dropMessage.test(recipient.name(), msg)) {
+            return new CompletableFuture<NetworkMessage>().orTimeout(10, TimeUnit.MILLISECONDS);

Review Comment:
   From the documentation of `org.apache.ignite.network.DefaultMessagingService#dropMessages`, it says that the future will not be completed, but here it turns out it will be completed.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java:
##########
@@ -78,15 +78,18 @@ public TxStateStorage txStatePartitionStorage() {
     public CompletableFuture<MvPartitionStorage> reCreateMvPartitionStorage() throws StorageException {
         assert mvTableStorage.getMvPartition(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-        return mvTableStorage.destroyPartition(partId())
+        // TODO: IGNITE-18022 - actually recreate or do in a different way

Review Comment:
   I think I confused you and you need to specify **IGNITE-18030**



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;

Review Comment:
   I think it's worth testing all storages.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {

Review Comment:
   Why do't use `org.apache.ignite.internal.AbstractClusterIntegrationTest` ?
   
   There is no documentation for methods and tests and it is quite clear to me what is being tested and what we expect, please add documentation.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private static final String DEFAULT_STORAGE_ENGINE = RocksDbStorageEngine.ENGINE_NAME;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo);
+    }
+
+    @AfterEach
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    private Path nodeWorkDir(String nodeName) {
+        return workDir.resolve(nodeName);
+    }
+
+    @Test
+    void snapshotReadOnRestartWorksCorrectly() {

Review Comment:
   Don't quite understand what is being tested here? more like local recovery.



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -275,7 +280,13 @@ private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            inboundExecutor.execute(() -> onMessage(obj));
+            inboundExecutor.execute(() -> {
+                try {
+                    onMessage(obj);
+                } catch (RuntimeException e) {
+                    LOG.warn("onMessage() failed while processing " + obj.message() + " from " + obj.consistentId(), e);

Review Comment:
   Let's log on debug level with `Supplier` or `IgniteStringFormatter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org