You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/01/04 09:11:47 UTC

[ignite-3] branch main updated: IGNITE-18079 Integrate RAFT streaming snapshots (#1461)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ce17e6eb22 IGNITE-18079 Integrate RAFT streaming snapshots (#1461)
ce17e6eb22 is described below

commit ce17e6eb2240f77468a36873947118b902de7005
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Jan 4 13:11:42 2023 +0400

    IGNITE-18079 Integrate RAFT streaming snapshots (#1461)
---
 .../internal/cluster/management/MockNode.java      |  13 +-
 .../testframework/ConfigurationExtension.java      |   3 +-
 .../internal/testframework/IgniteTestUtils.java    |  10 +-
 modules/metastorage-client/build.gradle            |   1 +
 modules/metastorage-client/pom.xml                 |   7 +
 .../client/ItMetaStorageServiceTest.java           |   8 +-
 .../ignite/network/DefaultMessagingService.java    |  48 +-
 .../configuration/RaftConfigurationSchema.java     |  10 +
 modules/raft/build.gradle                          |   1 +
 .../internal/raft/ItRaftGroupServiceTest.java      |   5 +-
 .../java/org/apache/ignite/internal/raft/Loza.java |   6 +
 .../ignite/raft/jraft/option/NodeOptions.java      |   1 +
 .../org/apache/ignite/internal/raft/LozaTest.java  |   8 +-
 .../internal/AbstractClusterIntegrationTest.java   |   1 +
 .../java/org/apache/ignite/internal/Cluster.java   | 434 ++++++++++++
 .../org/apache/ignite/internal/SessionUtils.java   |  52 ++
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     | 725 +++++++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   9 +
 .../raft/snapshot/PartitionAccessImpl.java         |   7 +-
 .../snapshot/outgoing/OutgoingSnapshotReader.java  |   8 +
 .../incoming/IncomingSnapshotCopierTest.java       |   6 +-
 21 files changed, 1342 insertions(+), 21 deletions(-)

diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
index c814d46be5..3e8dd255eb 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.cluster.management;
 
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -25,12 +28,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.util.ReverseIterator;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -77,7 +82,13 @@ public class MockNode {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);
+        ConfigurationValue<Integer> rpcInstallSnapshotTimeoutValue = mock(ConfigurationValue.class);
+
+        when(raftConfiguration.rpcInstallSnapshotTimeout()).thenReturn(rpcInstallSnapshotTimeoutValue);
+        when(rpcInstallSnapshotTimeoutValue.value()).thenReturn(10);
+
+        Loza raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
 
         var clusterStateStorage = new RocksDbClusterStateStorage(workDir.resolve("cmg"));
 
diff --git a/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java b/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
index 2a81dc9480..8798a6ca3b 100644
--- a/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
+++ b/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
@@ -27,6 +27,7 @@ import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.po
 import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.touch;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
 
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigObject;
@@ -273,7 +274,7 @@ public class ConfigurationExtension implements BeforeEachCallback, AfterEachCall
         );
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
-        RootKey rootKey = mock(RootKey.class);
+        RootKey rootKey = mock(RootKey.class, withSettings().lenient());
 
         when(rootKey.key()).thenReturn("mock");
         when(rootKey.type()).thenReturn(LOCAL);
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index efbff359ca..a7a700a0ed 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -303,27 +303,27 @@ public final class IgniteTestUtils {
      *
      * @param t   Throwable to check.
      * @param cls Cause classes to check.
-     * @param msg Message text that should be in cause (if {@code null}, message won't be checked).
+     * @param messageFragment Fragment that must be a substring of a cause message (if {@code null}, message won't be checked).
      * @return {@code True} if one of the causing exception is an instance of passed in classes, {@code false} otherwise.
      */
     public static boolean hasCause(
             @NotNull Throwable t,
             @NotNull Class<?> cls,
-            @Nullable String msg
+            @Nullable String messageFragment
     ) {
         for (Throwable th = t; th != null; th = th.getCause()) {
             if (cls.isAssignableFrom(th.getClass())) {
-                if (msg == null) {
+                if (messageFragment == null) {
                     return true;
                 }
 
-                if (th.getMessage() != null && th.getMessage().contains(msg)) {
+                if (th.getMessage() != null && th.getMessage().contains(messageFragment)) {
                     return true;
                 }
             }
 
             for (Throwable n : th.getSuppressed()) {
-                if (hasCause(n, cls, msg)) {
+                if (hasCause(n, cls, messageFragment)) {
                     return true;
                 }
             }
diff --git a/modules/metastorage-client/build.gradle b/modules/metastorage-client/build.gradle
index c2f9fbee67..538ab583a3 100644
--- a/modules/metastorage-client/build.gradle
+++ b/modules/metastorage-client/build.gradle
@@ -39,6 +39,7 @@ dependencies {
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     integrationTestImplementation(testFixtures(project(':ignite-raft')))
     integrationTestImplementation(testFixtures(project(':ignite-network')))
+    integrationTestImplementation(testFixtures(project(':ignite-configuration')))
     integrationTestImplementation project(':ignite-metastorage-server')
     integrationTestImplementation project(':ignite-api')
     integrationTestImplementation project(':ignite-raft')
diff --git a/modules/metastorage-client/pom.xml b/modules/metastorage-client/pom.xml
index 72865f5c03..cc4dc0639a 100644
--- a/modules/metastorage-client/pom.xml
+++ b/modules/metastorage-client/pom.xml
@@ -106,6 +106,13 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-configuration</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <!-- Logging in tests -->
         <dependency>
             <groupId>org.slf4j</groupId>
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index 313c49a08d..93d559de16 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -65,6 +65,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -118,6 +120,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
  * Meta storage client tests.
  */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
 @ExtendWith(MockitoExtension.class)
 public class ItMetaStorageServiceTest {
     /** The logger. */
@@ -182,6 +185,9 @@ public class ItMetaStorageServiceTest {
     @WorkDirectory
     private Path dataPath;
 
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
     static {
         EXPECTED_RESULT_MAP = new TreeMap<>();
 
@@ -1076,7 +1082,7 @@ public class ItMetaStorageServiceTest {
     ) throws NodeStoppingException {
         var raftManager = new Loza(
                 node,
-                mock(RaftConfiguration.class),
+                raftConfiguration,
                 dataPath.resolve("raftManager" + raftManagers.size()),
                 new HybridClockImpl());
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index a984ddcfdc..f39716614c 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -91,8 +91,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
             new NamedThreadFactory("MessagingService-inbound-", LOG)
     );
 
+    // TODO: IGNITE-18493 - remove/move this
     @Nullable
-    private volatile BiPredicate<String, NetworkMessage> dropMessagePredicate;
+    private volatile BiPredicate<String, NetworkMessage> dropMessagesPredicate;
 
     /**
      * Constructor.
@@ -166,8 +167,8 @@ public class DefaultMessagingService extends AbstractMessagingService {
             return failedFuture(new NodeStoppingException());
         }
 
-        BiPredicate<String, NetworkMessage> dropMessage = dropMessagePredicate;
-        if (dropMessage != null && dropMessage.test(recipient.name(), msg)) {
+        // TODO: IGNITE-18493 - remove/move this
+        if (shouldDropMessage(recipient, msg)) {
             return new CompletableFuture<>();
         }
 
@@ -188,6 +189,12 @@ public class DefaultMessagingService extends AbstractMessagingService {
         return sendMessage0(recipient.name(), recipientAddress, message);
     }
 
+    private boolean shouldDropMessage(ClusterNode recipient, NetworkMessage msg) {
+        BiPredicate<String, NetworkMessage> predicate = dropMessagesPredicate;
+
+        return predicate != null && predicate.test(recipient.name(), msg);
+    }
+
     /**
      * Sends an invocation request. If the target is the current node, then message will be delivered immediately.
      *
@@ -201,6 +208,11 @@ public class DefaultMessagingService extends AbstractMessagingService {
             return failedFuture(new NodeStoppingException());
         }
 
+        // TODO: IGNITE-18493 - remove/move this
+        if (shouldDropMessage(recipient, msg)) {
+            return new CompletableFuture<NetworkMessage>().orTimeout(10, TimeUnit.MILLISECONDS);
+        }
+
         long correlationId = createCorrelationId();
 
         CompletableFuture<NetworkMessage> responseFuture = new CompletableFuture<NetworkMessage>()
@@ -275,7 +287,17 @@ public class DefaultMessagingService extends AbstractMessagingService {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            inboundExecutor.execute(() -> onMessage(obj));
+            inboundExecutor.execute(() -> {
+                try {
+                    onMessage(obj);
+                } catch (Throwable e) {
+                    LOG.error("onMessage() failed while processing {} from {}", e, obj.message(), obj.consistentId());
+
+                    if (e instanceof Error) {
+                        throw e;
+                    }
+                }
+            });
 
             return;
         }
@@ -400,18 +422,30 @@ public class DefaultMessagingService extends AbstractMessagingService {
         IgniteUtils.shutdownAndAwaitTermination(outboundExecutor, 10, TimeUnit.SECONDS);
     }
 
+    // TODO: IGNITE-18493 - remove/move this
     /**
      * Installs a predicate, it will be consulted with for each message being sent; when it returns {@code true}, the
-     * message will be silently dropped (it will not be sent, the corresponding future will never complete).
+     * message will be dropped (it will not be sent; the corresponding future will time out soon for {@code invoke()} methods
+     * and will never complete for methods different from {@code invoke()}).
      *
      * @param predicate Predicate that will decide whether a message should be dropped. Its first argument is the recipient
      *     node's consistent ID.
      */
     @TestOnly
     public void dropMessages(BiPredicate<String, NetworkMessage> predicate) {
-        dropMessagePredicate = predicate;
+        dropMessagesPredicate = predicate;
+    }
+
+    /**
+     * Returns a predicate used to decide whether a message should be dropped, or {@code null} if message dropping is disabled.
+     */
+    @TestOnly
+    @Nullable
+    public BiPredicate<String, NetworkMessage> dropMessagesPredicate() {
+        return dropMessagesPredicate;
     }
 
+    // TODO: IGNITE-18493 - remove/move this
     /**
      * Stops dropping messages.
      *
@@ -419,6 +453,6 @@ public class DefaultMessagingService extends AbstractMessagingService {
      */
     @TestOnly
     public void stopDroppingMessages() {
-        dropMessagePredicate = null;
+        dropMessagesPredicate = null;
     }
 }
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java
index a1834da885..c42c11be67 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.configuration;
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
@@ -27,6 +28,15 @@ import org.apache.ignite.configuration.annotation.ConfigurationType;
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /**
+     * RPC Timeout for InstallSnapshot request (in milliseconds). This is the maximum allowed duration from sending
+     * InstallSnapshot request and getting a response to it; during it, the snapshot must be fully transferred to
+     * a recipient and installed.
+     */
+    @Value(hasDefault = true)
+    // TODO: IGNITE-18480 - is 5 minutes a good default?
+    public int rpcInstallSnapshotTimeout = 5 * 60 * 1000;
+
     /** Configuration for Raft groups corresponding to table partitions. */
     // TODO: IGNITE-16647 - Volatile RAFT configuration should be moved elsewhere
     @ConfigValue
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index 495f49e737..739337c880 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -43,6 +43,7 @@ dependencies {
     testAnnotationProcessor libs.jmh.annotation.processor
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation(testFixtures(project(':ignite-network')))
+    testImplementation(testFixtures(project(':ignite-configuration')))
     testImplementation project(':ignite-configuration')
     testImplementation project(':ignite-core')
     testImplementation project(':ignite-network')
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index 10a4a25466..c244f62e13 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -40,6 +40,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -65,6 +67,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
  * Integration test methods of raft group service.
  */
 @ExtendWith(MockitoExtension.class)
+@ExtendWith(ConfigurationExtension.class)
 public class ItRaftGroupServiceTest extends IgniteAbstractTest {
     private static final int NODES_CNT = 2;
 
@@ -79,7 +82,7 @@ public class ItRaftGroupServiceTest extends IgniteAbstractTest {
     @Mock
     private RaftGroupEventsListener eventsListener;
 
-    @Mock
+    @InjectConfiguration
     private RaftConfiguration raftConfiguration;
 
     @BeforeEach
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 968313d7c8..d293eeb412 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -97,6 +97,8 @@ public class Loza implements RaftManager {
     /** Raft configuration. */
     private final RaftConfiguration raftConfiguration;
 
+    private final NodeOptions opts;
+
     /**
      * The constructor.
      *
@@ -133,6 +135,8 @@ public class Loza implements RaftManager {
         options.setClock(clock);
         options.setSafeTimeTracker(safeTimeTracker);
 
+        this.opts = options;
+
         this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, options);
 
         this.executor = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
@@ -145,6 +149,8 @@ public class Loza implements RaftManager {
     /** {@inheritDoc} */
     @Override
     public void start() {
+        opts.setRpcInstallSnapshotTimeout(raftConfiguration.rpcInstallSnapshotTimeout().value());
+
         raftServer.start();
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 7518e40927..ae5b6725a6 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -645,6 +645,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         nodeOptions.setSharedPools(this.isSharedPools());
         nodeOptions.setRpcDefaultTimeout(this.getRpcDefaultTimeout());
         nodeOptions.setRpcConnectTimeoutMs(this.getRpcConnectTimeoutMs());
+        nodeOptions.setRpcInstallSnapshotTimeout(this.getRpcInstallSnapshotTimeout());
         nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
         nodeOptions.setClock(this.getClock());
         nodeOptions.setSafeTimeTracker(this.getSafeTimeTracker());
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index d32d000b2c..bb60d5e4fb 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -22,6 +22,8 @@ import static org.mockito.Mockito.mock;
 
 import java.util.Objects;
 import java.util.Set;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -42,11 +44,15 @@ import org.mockito.junit.jupiter.MockitoExtension;
  * It is mocking all components except Loza and checks API methods of the component in various conditions.
  */
 @ExtendWith(MockitoExtension.class)
+@ExtendWith(ConfigurationExtension.class)
 public class LozaTest extends IgniteAbstractTest {
     /** Mock for network service. */
     @Mock
     private ClusterService clusterNetSvc;
 
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
     /**
      * Checks that the all API methods throw the exception ({@link org.apache.ignite.lang.NodeStoppingException})
      * when Loza is closed.
@@ -59,7 +65,7 @@ public class LozaTest extends IgniteAbstractTest {
         Mockito.doReturn(mock(MessagingService.class)).when(clusterNetSvc).messagingService();
         Mockito.doReturn(mock(TopologyService.class)).when(clusterNetSvc).topologyService();
 
-        Loza loza = new Loza(clusterNetSvc, mock(RaftConfiguration.class), workDir, new HybridClockImpl());
+        Loza loza = new Loza(clusterNetSvc, raftConfiguration, workDir, new HybridClockImpl());
 
         loza.start();
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index 7a5dd7035d..f00ea7e66b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -53,6 +53,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @SuppressWarnings("ALL")
 @ExtendWith(WorkDirectoryExtension.class)
+// TODO: IGNITE-18465- use Cluster class
 public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractTest {
     private static final IgniteLogger LOG = Loggers.forClass(AbstractClusterIntegrationTest.class);
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
new file mode 100644
index 0000000000..27a7e78baf
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutNodesIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return Objects.requireNonNull(nodes.get(index));
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl aliveNode() {
+        return IntStream.range(0, nodes.size())
+                .filter(index -> nodes.get(index) != null)
+                .filter(index -> !knockedOutNodesIndices.contains(index))
+                .mapToObj(nodes::get)
+                .findAny()
+                .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+    }
+
+    /**
+     * Starts a new node with the given index.
+     *
+     * @param index Node index.
+     * @return Started node (if the cluster is already initialized, the node is returned when it joins the cluster; if it
+     *     is not initialized, the node is returned in a state in which it is ready to join the cluster).
+     */
+    public IgniteImpl startNode(int index) {
+        checkNodeIndex(index);
+
+        IgniteImpl newIgniteNode;
+
+        try {
+            newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        nodes.set(index, newIgniteNode);
+
+        return newIgniteNode;
+    }
+
+    private void checkNodeIndex(int index) {
+        if (index < 0) {
+            throw new IllegalArgumentException("Index cannot be negative");
+        }
+        if (index >= nodes.size()) {
+            throw new IllegalArgumentException("Cluster only contains " + nodes.size() + " nodes, but node with index "
+                    + index + " was tried to be accessed");
+        }
+    }
+
+    /**
+     * Stops a node by index.
+     *
+     * @param index Node index in the cluster.
+     */
+    public void stopNode(int index) {
+        checkNodeIndex(index);
+
+        IgnitionManager.stop(nodes.get(index).name());
+
+        nodes.set(index, null);
+    }
+
+    /**
+     * Restarts a node by index.
+     *
+     * @param index Node index in the cluster.
+     * @return New node.
+     */
+    public IgniteImpl restartNode(int index) {
+        stopNode(index);
+
+        return startNode(index);
+    }
+
+    /**
+     * Returns {@link RaftGroupService} that is the leader for the given table partition.
+     *
+     * @param tablePartitionId Table partition ID for which a leader is to be found.
+     * @return {@link RaftGroupService} that is the leader for the given table partition.
+     * @throws InterruptedException Thrown if interrupted while waiting for the leader to be found.
+     */
+    public RaftGroupService leaderServiceFor(TablePartitionId tablePartitionId) throws InterruptedException {
+        AtomicReference<RaftGroupService> serviceRef = new AtomicReference<>();
+
+        assertTrue(
+                waitForCondition(() -> {
+                    RaftGroupService service = currentLeaderServiceFor(tablePartitionId);
+
+                    serviceRef.set(service);
+
+                    return service != null;
+                }, 10_000),
+                "Did not find a leader for " + tablePartitionId + " in time"
+        );
+
+        RaftGroupService result = serviceRef.get();
+
+        assertNotNull(result);
+
+        return result;
+    }
+
+    @Nullable
+    private RaftGroupService currentLeaderServiceFor(TablePartitionId tablePartitionId) {
+        return runningNodes()
+                .map(IgniteImpl.class::cast)
+                .flatMap(ignite -> {
+                    JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server();
+
+                    Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream()
+                            .filter(nodeId -> nodeId.groupId().equals(tablePartitionId))
+                            .findAny();
+
+                    return maybeRaftNodeId.map(server::raftGroupService).stream();
+                })
+                .filter(service -> service.getRaftNode().isLeader())
+                .findAny()
+                .orElse(null);
+    }
+
+    /**
+     * Returns nodes that are started and not stopped. This can include knocked out nodes.
+     */
+    public Stream<IgniteImpl> runningNodes() {
+        return nodes.stream().filter(Objects::nonNull);
+    }
+
+    /**
+     * Opens a {@link Session} (that can be used to execute SQL queries) through a node with the given index.
+     *
+     * @param nodeIndex Index of the node on which to open a session.
+     * @return A session.
+     */
+    public Session openSession(int nodeIndex) {
+        return node(nodeIndex).sql()
+                .sessionBuilder()
+                .defaultSchema("PUBLIC")
+                .defaultQueryTimeout(QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .build();
+    }
+
+    /**
+     * Shuts down the  cluster by stopping all its nodes.
+     */
+    public void shutdown() {
+        runningNodes().forEach(node -> IgnitionManager.stop(node.name()));
+    }
+
+    /**
+     * Knocks out a node so that it stops receiving messages from other nodes of the cluster. To bring a node back,
+     * {@link #reanimateNode(int, NodeKnockout)} should be used.
+     */
+    public void knockOutNode(int nodeIndex, NodeKnockout knockout) {
+        knockout.knockOutNode(nodeIndex, this);
+
+        knockedOutNodesIndices.add(nodeIndex);
+    }
+
+    /**
+     * Reanimates a knocked-out node so that it starts receiving messages from other nodes of the cluster again. This nullifies the
+     * effect of {@link #knockOutNode(int, NodeKnockout)}.
+     */
+    public void reanimateNode(int nodeIndex, NodeKnockout knockout) {
+        knockout.reanimateNode(nodeIndex, this);
+
+        knockedOutNodesIndices.remove(nodeIndex);
+    }
+
+    /**
+     * Executes an action with a {@link Session} opened via a node with the given index.
+     *
+     * @param nodeIndex Index of node on which to execute the action.
+     * @param action Action to execute.
+     */
+    public void doInSession(int nodeIndex, Consumer<Session> action) {
+        try (Session session = openSession(nodeIndex)) {
+            action.accept(session);
+        }
+    }
+
+    /**
+     * Returns result of executing an action with a {@link Session} opened via a node with the given index.
+     *
+     * @param nodeIndex Index of node on which to execute the action.
+     * @param action Action to execute.
+     * @return Action result.
+     */
+    public <T> T doInSession(int nodeIndex, Function<Session, T> action) {
+        try (Session session = openSession(nodeIndex)) {
+            return action.apply(session);
+        }
+    }
+
+    /**
+     * Executes a SQL query on a node with the given index.
+     *
+     * @param nodeIndex Index of node on which to execute the query.
+     * @param sql SQL query to execute.
+     * @param extractor Used to extract the result from a {@link ResultSet}.
+     * @return Query result.
+     */
+    public <T> T query(int nodeIndex, String sql, Function<ResultSet, T> extractor) {
+        return doInSession(nodeIndex, session -> {
+            try (ResultSet resultSet = session.execute(null, sql)) {
+                return extractor.apply(resultSet);
+            }
+        });
+    }
+
+    /**
+     * A way to make a node be separated from a cluster and stop receiving updates.
+     */
+    public enum NodeKnockout {
+        /** Stop a node to knock it out. */
+        STOP {
+            @Override
+            void knockOutNode(int nodeIndex, Cluster cluster) {
+                cluster.stopNode(nodeIndex);
+            }
+
+            @Override
+            void reanimateNode(int nodeIndex, Cluster cluster) {
+                cluster.startNode(nodeIndex);
+            }
+        },
+        /** Emulate a network partition so that messages to the knocked-out node are dropped. */
+        PARTITION_NETWORK {
+            @Override
+            void knockOutNode(int nodeIndex, Cluster cluster) {
+                IgniteImpl recipient = cluster.node(nodeIndex);
+
+                cluster.runningNodes()
+                        .filter(node -> node != recipient)
+                        .forEach(sourceNode -> {
+                            sourceNode.dropMessages(
+                                    new AddCensorshipByRecipientConsistentId(recipient.name(), sourceNode.dropMessagesPredicate())
+                            );
+                        });
+
+                LOG.info("Knocked out node " + nodeIndex + " with an artificial network partition");
+            }
+
+            @Override
+            void reanimateNode(int nodeIndex, Cluster cluster) {
+                IgniteImpl receiver = cluster.node(nodeIndex);
+
+                cluster.runningNodes()
+                        .filter(node -> node != receiver)
+                        .forEach(ignite -> {
+                            var censor = (AddCensorshipByRecipientConsistentId) ignite.dropMessagesPredicate();
+
+                            assertNotNull(censor);
+
+                            if (censor.prevPredicate == null) {
+                                ignite.stopDroppingMessages();
+                            } else {
+                                ignite.dropMessages(censor.prevPredicate);
+                            }
+                        });
+
+                LOG.info("Reanimated node " + nodeIndex + " by removing an artificial network partition");
+            }
+        };
+
+        /**
+         * Knocks out a node so that it stops receiving messages from other nodes of the cluster. To bring a node back,
+         * {@link #reanimateNode(int, Cluster)} should be used.
+         */
+        abstract void knockOutNode(int nodeIndex, Cluster cluster);
+
+        /**
+         * Reanimates a knocked-out node so that it starts receiving messages from other nodes of the cluster again. This nullifies the
+         * effect of {@link #knockOutNode(int, Cluster)}.
+         */
+        abstract void reanimateNode(int nodeIndex, Cluster cluster);
+    }
+
+    private static class AddCensorshipByRecipientConsistentId implements BiPredicate<String, NetworkMessage> {
+        private final String recipientName;
+        @Nullable
+        private final BiPredicate<String, NetworkMessage> prevPredicate;
+
+        private AddCensorshipByRecipientConsistentId(String recipientName, @Nullable BiPredicate<String, NetworkMessage> prevPredicate) {
+            this.recipientName = recipientName;
+            this.prevPredicate = prevPredicate;
+        }
+
+        @Override
+        public boolean test(String recipientConsistentId, NetworkMessage networkMessage) {
+            return Objects.equals(recipientConsistentId, recipientName)
+                    || (prevPredicate != null && prevPredicate.test(recipientConsistentId, networkMessage));
+        }
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java
new file mode 100644
index 0000000000..d0562a1373
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utils to work with {@link Session}.
+ */
+public class SessionUtils {
+    /**
+     * Executes an update on a session, possibly in a transaction.
+     *
+     * @param sql SQL query to execute.
+     * @param session Session on which to execute.
+     * @param transaction Transaction in which to execute the update, or {@code null} if the update should
+     *     be executed n an implicit transaction.
+     */
+    public static void executeUpdate(String sql, Session session, @Nullable Transaction transaction) {
+        try (ResultSet ignored = session.execute(transaction, sql)) {
+            // Do nothing, just adhere to the syntactic ceremony...
+        }
+    }
+
+    /**
+     * Executes an update on a session in an implicit transaction.
+     *
+     * @param sql SQL query to execute.
+     * @param session Session on which to execute.
+     */
+    public static void executeUpdate(String sql, Session session) {
+        executeUpdate(sql, session, null);
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
new file mode 100644
index 0000000000..77d24e6007
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -0,0 +1,725 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  network: {\n"
+            + "    port:{},\n"
+            + "    nodeFinder:{\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  raft.rpcInstallSnapshotTimeout: 10000"
+            + "}";
+
+    /**
+     * Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
+     * to a storage engine used.
+     */
+    private static final String DEFAULT_STORAGE_ENGINE = "<default>";
+
+    /**
+     * {@link NodeKnockout} that is used by tests that are indifferent for the knockout strategy being used.
+     */
+    private static final NodeKnockout DEFAULT_KNOCKOUT = NodeKnockout.PARTITION_NETWORK;
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    @Timeout(60)
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    /**
+     * Executes the given action, retrying it up to a few times if a transient failure occurs (like node unavailability).
+     */
+    private static <T> T withRetry(Supplier<T> action) {
+        // TODO: IGNITE-18423 remove this retry machinery when the networking bug is fixed as replication timeout seems to be caused by it.
+
+        int maxAttempts = 4;
+        int sleepMillis = 500;
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            try {
+                return action.get();
+            } catch (RuntimeException e) {
+                if (attempt < maxAttempts && isTransientFailure(e)) {
+                    LOG.warn("Attempt " + attempt + " failed, going to retry", e);
+                } else {
+                    throw e;
+                }
+            }
+
+            try {
+                Thread.sleep(sleepMillis);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                fail("Interrupted while waiting for next attempt");
+            }
+
+            sleepMillis = sleepMillis * 2;
+        }
+
+        throw new AssertionError("Should not reach here");
+    }
+
+    private static boolean isTransientFailure(RuntimeException e) {
+        return hasCause(e, ReplicationTimeoutException.class, null)
+                || hasCause(e, IgniteInternalException.class, "Failed to send message to node")
+                || hasCause(e, IgniteInternalCheckedException.class, "Failed to execute query, node left")
+                || hasCause(e, SqlValidatorException.class, "Object 'TEST' not found");
+    }
+
+    private <T> T queryWithRetry(int nodeIndex, String sql, Function<ResultSet, T> extractor) {
+        return withRetry(() -> cluster.query(nodeIndex, sql, extractor));
+    }
+
+    /**
+     * Reads all rows from TEST table.
+     */
+    private static List<IgniteBiTuple<Integer, String>> readRows(ResultSet rs) {
+        List<IgniteBiTuple<Integer, String>> rows = new ArrayList<>();
+
+        while (rs.hasNext()) {
+            SqlRow sqlRow = rs.next();
+
+            rows.add(new IgniteBiTuple<>(sqlRow.intValue(0), sqlRow.stringValue(1)));
+        }
+
+        return rows;
+    }
+
+    /**
+     * Tests that a leader successfully feeds a follower with a RAFT snapshot (using {@link NodeKnockout#STOP} strategy
+     * to knock-out the follower to make it require a snapshot installation).
+     */
+    @Test
+    @Disabled("Enable when the IGNITE-18170 deadlock is fixed")
+    void leaderFeedsFollowerWithSnapshotWithKnockoutStop() throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(Cluster.NodeKnockout.STOP, DEFAULT_STORAGE_ENGINE);
+    }
+
+    /**
+     * Tests that a leader successfully feeds a follower with a RAFT snapshot (using {@link NodeKnockout#PARTITION_NETWORK} strategy
+     * to knock-out the follower to make it require a snapshot installation).
+     */
+    @Test
+    void leaderFeedsFollowerWithSnapshotWithKnockoutPartitionNetwork() throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(Cluster.NodeKnockout.PARTITION_NETWORK, DEFAULT_STORAGE_ENGINE);
+    }
+
+    /**
+     * Tests that a leader successfully feeds a follower with a RAFT snapshot (using the given {@link NodeKnockout} strategy
+     * to knock-out the follower to make it require a snapshot installation and the given storage engine).
+     */
+    private void testLeaderFeedsFollowerWithSnapshot(NodeKnockout knockout, String storageEngine) throws Exception {
+        feedNode2WithSnapshotOfOneRow(knockout, storageEngine);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout) throws InterruptedException {
+        feedNode2WithSnapshotOfOneRow(knockout, DEFAULT_STORAGE_ENGINE);
+    }
+
+    private void feedNode2WithSnapshotOfOneRow(NodeKnockout knockout, String storageEngine) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, storageEngine);
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+    }
+
+    /**
+     * Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition
+     * of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism.
+     *
+     * @param knockout The knock-out strategy that was used to knock-out node 2 and that will be used to reanimate it.
+     * @see NodeKnockout
+     */
+    private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, DEFAULT_STORAGE_ENGINE);
+    }
+
+    /**
+     * Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition
+     * of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism.
+     *
+     * @param knockout The knock-out strategy that should be used to knock-out node 2.
+     * @param storageEngine Storage engine for the TEST table.
+     * @see NodeKnockout
+     */
+    private void prepareClusterForInstallingSnapshotToNode2(NodeKnockout knockout, String storageEngine) throws InterruptedException {
+        prepareClusterForInstallingSnapshotToNode2(knockout, storageEngine, theCluster -> {});
+    }
+
+    /**
+     * Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition
+     * of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism.
+     *
+     * @param knockout The knock-out strategy that should be used to knock-out node 2.
+     * @param storageEngine Storage engine for the TEST table.
+     * @param doOnClusterAfterInit Action executed just after the cluster is started and initialized.
+     * @see NodeKnockout
+     */
+    private void prepareClusterForInstallingSnapshotToNode2(
+            NodeKnockout knockout,
+            String storageEngine,
+            Consumer<Cluster> doOnClusterAfterInit
+    ) throws InterruptedException {
+        cluster.startAndInit(3);
+
+        doOnClusterAfterInit.accept(cluster);
+
+        createTestTableWith3Replicas(storageEngine);
+
+        // Prepare the scene: force node 0 to be a leader, and node 2 to be a follower.
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        cluster.knockOutNode(2, knockout);
+
+        cluster.doInSession(0, session -> {
+            executeUpdate("insert into test(key, value) values (1, 'one')", session);
+        });
+
+        // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
+        causeLogTruncationOnSolePartitionLeader();
+    }
+
+    private void createTestTableWith3Replicas(String storageEngine) throws InterruptedException {
+        String sql = "create table test (key int primary key, value varchar(20))"
+                + (DEFAULT_STORAGE_ENGINE.equals(storageEngine) ? "" : " engine " + storageEngine)
+                + " with partitions=1, replicas=3";
+
+        cluster.doInSession(0, session -> {
+            executeUpdate(sql, session);
+        });
+
+        waitForTableToStart();
+    }
+
+    private void waitForTableToStart() throws InterruptedException {
+        // TODO: IGNITE-18203 - remove this wait because when a table creation query is executed, the table must be fully ready.
+
+        BooleanSupplier tableStarted = () -> {
+            int numberOfStartedRaftNodes = cluster.runningNodes()
+                    .map(ItTableRaftSnapshotsTest::tablePartitionIds)
+                    .mapToInt(List::size)
+                    .sum();
+            return numberOfStartedRaftNodes == 3;
+        };
+
+        assertTrue(waitForCondition(tableStarted, 10_000), "Did not see all table RAFT nodes started");
+    }
+
+    /**
+     * Causes log truncation on the RAFT leader of the sole table partition that exists in the cluster.
+     * After such truncation, when a knocked-out follower gets reanimated, the leader will not be able to feed it
+     * with AppendEntries (because the leader does not already have the index that is required to send AppendEntries
+     * to the lagging follower), so the leader will have to send InstallSnapshot instead.
+     */
+    private void causeLogTruncationOnSolePartitionLeader() throws InterruptedException {
+        // Doing this twice because first snapshot creation does not trigger log truncation.
+        doSnapshotOnSolePartitionLeader();
+        doSnapshotOnSolePartitionLeader();
+    }
+
+    /**
+     * Causes a RAFT snapshot to be taken on the RAFT leader of the sole table partition that exists in the cluster.
+     */
+    private void doSnapshotOnSolePartitionLeader() throws InterruptedException {
+        TablePartitionId tablePartitionId = solePartitionId();
+
+        doSnapshotOn(tablePartitionId);
+    }
+
+    /**
+     * Returns the ID of the sole table partition that exists in the cluster.
+     */
+    private TablePartitionId solePartitionId() {
+        List<TablePartitionId> tablePartitionIds = tablePartitionIds(cluster.aliveNode());
+
+        assertThat(tablePartitionIds.size(), is(1));
+
+        return tablePartitionIds.get(0);
+    }
+
+    /**
+     * Returns the IDs of all table partitions that exist on the given node.
+     */
+    private static List<TablePartitionId> tablePartitionIds(IgniteImpl node) {
+        return node.raftManager().localNodes().stream()
+                .map(RaftNodeId::groupId)
+                .filter(TablePartitionId.class::isInstance)
+                .map(TablePartitionId.class::cast)
+                .collect(toList());
+    }
+
+    /**
+     * Takes a RAFT snapshot on the leader of the RAFT group corresponding to the given table partition.
+     */
+    private void doSnapshotOn(TablePartitionId tablePartitionId) throws InterruptedException {
+        RaftGroupService raftGroupService = cluster.leaderServiceFor(tablePartitionId);
+
+        CountDownLatch snapshotLatch = new CountDownLatch(1);
+        AtomicReference<Status> snapshotStatus = new AtomicReference<>();
+
+        raftGroupService.getRaftNode().snapshot(status -> {
+            snapshotStatus.set(status);
+            snapshotLatch.countDown();
+        });
+
+        assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), "Snapshot was not finished in time");
+
+        assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + snapshotStatus.get());
+    }
+
+    /**
+     * Reanimates (that is, reverts the effects of a knock out) node 2 and waits until a RAFT snapshot is installed
+     * on it for the sole table partition in the cluster.
+     */
+    private void reanimateNode2AndWaitForSnapshotInstalled(NodeKnockout knockout) throws InterruptedException {
+        reanimateNodeAndWaitForSnapshotInstalled(2, knockout);
+    }
+
+    /**
+     * Reanimates (that is, reverts the effects of a knock out) a node with the given index and waits until a RAFT snapshot is installed
+     * on it for the sole table partition in the cluster.
+     */
+    private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex, NodeKnockout knockout) throws InterruptedException {
+        CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
+
+        Logger replicatorLogger = Logger.getLogger(Replicator.class.getName());
+
+        var handler = new NoOpHandler() {
+            @Override
+            public void publish(LogRecord record) {
+                if (record.getMessage().matches("Node .+ received InstallSnapshotResponse from .+_" + nodeIndex + " .+ success=true")) {
+                    snapshotInstalledLatch.countDown();
+                }
+            }
+        };
+
+        replicatorLogger.addHandler(handler);
+
+        try {
+            cluster.reanimateNode(nodeIndex, knockout);
+
+            assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did not install a snapshot in time");
+        } finally {
+            replicatorLogger.removeHandler(handler);
+        }
+    }
+
+    private void transferLeadershipOnSolePartitionTo(int nodeIndex) throws InterruptedException {
+        String nodeConsistentId = cluster.node(nodeIndex).node().name();
+
+        int maxAttempts = 3;
+
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            boolean transferred = tryTransferLeadershipOnSolePartitionTo(nodeConsistentId);
+
+            if (transferred) {
+                break;
+            }
+
+            if (attempt < maxAttempts) {
+                LOG.info("Did not transfer leadership after " + attempt + " attempts, going to retry...");
+            } else {
+                fail("Did not transfer leadership in time after " + maxAttempts + " attempts");
+            }
+        }
+    }
+
+    private boolean tryTransferLeadershipOnSolePartitionTo(String targetLeaderConsistentId) throws InterruptedException {
+        NodeImpl leaderBeforeTransfer = (NodeImpl) cluster.leaderServiceFor(solePartitionId()).getRaftNode();
+
+        initiateLeadershipTransferTo(targetLeaderConsistentId, leaderBeforeTransfer);
+
+        BooleanSupplier leaderTransferred = () -> {
+            PeerId leaderId = leaderBeforeTransfer.getLeaderId();
+            return leaderId != null && leaderId.getConsistentId().equals(targetLeaderConsistentId);
+        };
+
+        return waitForCondition(leaderTransferred, 10_000);
+    }
+
+    private static void initiateLeadershipTransferTo(String targetLeaderConsistentId, NodeImpl leaderBeforeTransfer) {
+        long startedMillis = System.currentTimeMillis();
+
+        while (true) {
+            Status status = leaderBeforeTransfer.transferLeadershipTo(new PeerId(targetLeaderConsistentId));
+
+            if (status.getRaftError() != RaftError.EBUSY) {
+                break;
+            }
+
+            if (System.currentTimeMillis() - startedMillis > 10_000) {
+                throw new IllegalStateException("Could not initiate leadership transfer to " + targetLeaderConsistentId + " in time");
+            }
+        }
+    }
+
+    /**
+     * Tests that, if first part of a transaction (everything before COMMIT) arrives using AppendEntries, and later the whole
+     * partition state arrives in a RAFT snapshot, then the transaction is seen as committed (i.e. its effects are seen).
+     *
+     * <p>{@link NodeKnockout#STOP} is used to knock out the follower which will accept the snapshot.
+     */
+    @Test
+    @Disabled("Enable when the IGNITE-18170 deadlock is resolved")
+    void txSemanticsIsMaintainedWithKnockoutStop() throws Exception {
+        txSemanticsIsMaintainedAfterInstallingSnapshot(Cluster.NodeKnockout.STOP);
+    }
+
+    /**
+     * Tests that, if first part of a transaction (everything before COMMIT) arrives using AppendEntries, and later the whole
+     * partition state arrives in a RAFT snapshot, then the transaction is seen as committed (i.e. its effects are seen).
+     *
+     * <p>{@link NodeKnockout#PARTITION_NETWORK} is used to knock out the follower which will accept the snapshot.
+     */
+    @Test
+    void txSemanticsIsMaintainedWithKnockoutPartitionNetwork() throws Exception {
+        txSemanticsIsMaintainedAfterInstallingSnapshot(Cluster.NodeKnockout.PARTITION_NETWORK);
+    }
+
+    /**
+     * Tests that, if first part of a transaction (everything before COMMIT) arrives using AppendEntries, and later the whole
+     * partition state arrives in a RAFT snapshot, then the transaction is seen as committed (i.e. its effects are seen).
+     *
+     * <p>The given {@link NodeKnockout} is used to knock out the follower which will accept the snapshot.
+     */
+    private void txSemanticsIsMaintainedAfterInstallingSnapshot(NodeKnockout knockout) throws Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        // Prepare the scene: force node 0 to be a leader, and node 2 to be a follower.
+        transferLeadershipOnSolePartitionTo(0);
+
+        Transaction tx = cluster.node(0).transactions().begin();
+
+        cluster.doInSession(0, session -> {
+            executeUpdate("insert into test(key, value) values (1, 'one')", session, tx);
+
+            cluster.knockOutNode(2, knockout);
+
+            tx.commit();
+        });
+
+        // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
+        causeLogTruncationOnSolePartitionLeader();
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    /**
+     * Tests that a leader successfully feeds a follower with a RAFT snapshot on any of the supported storage engines.
+     */
+    // TODO: IGNITE-18481 - make sure we don't forget to add new storage engines here
+    @ParameterizedTest
+    @ValueSource(strings = {
+            RocksDbStorageEngine.ENGINE_NAME,
+            PersistentPageMemoryStorageEngine.ENGINE_NAME,
+            VolatilePageMemoryStorageEngine.ENGINE_NAME
+    })
+    void leaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception {
+        testLeaderFeedsFollowerWithSnapshot(DEFAULT_KNOCKOUT, storageEngine);
+    }
+
+    /**
+     * Tests that entries can still be added to a follower using AppendEntries after it gets fed with a RAFT snapshot.
+     */
+    @Test
+    @Disabled("Enable when IGNITE-18485 is fixed")
+    void entriesKeepAppendedAfterSnapshotInstallation() throws Exception {
+        feedNode2WithSnapshotOfOneRow(DEFAULT_KNOCKOUT);
+
+        cluster.doInSession(0, session -> {
+            executeUpdate("insert into test(key, value) values (2, 'two')", session);
+        });
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test order by key",
+                ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+    }
+
+    /**
+     * Tests that, if commands are added to a leader while it installs a RAFT snapshot on a follower, these commands
+     * reach the follower and get applied after the snapshot is installed.
+     */
+    @Test
+    // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed
+    @Disabled("IGNITE-18423")
+    void entriesKeepAppendedDuringSnapshotInstallation() throws Exception {
+        NodeKnockout knockout = DEFAULT_KNOCKOUT;
+
+        prepareClusterForInstallingSnapshotToNode2(knockout);
+
+        AtomicBoolean installedSnapshot = new AtomicBoolean(false);
+        AtomicInteger lastLoadedKey = new AtomicInteger();
+
+        CompletableFuture<?> loadingFuture = IgniteTestUtils.runAsync(() -> {
+            for (int i = 2; !installedSnapshot.get(); i++) {
+                int key = i;
+                cluster.doInSession(0, session -> {
+                    executeUpdate("insert into test(key, value) values (" + key + ", 'extra')", session);
+                    lastLoadedKey.set(key);
+                });
+            }
+        });
+
+        reanimateNode2AndWaitForSnapshotInstalled(knockout);
+
+        installedSnapshot.set(true);
+
+        assertThat(loadingFuture, willSucceedIn(30, TimeUnit.SECONDS));
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        List<Integer> keys = queryWithRetry(2, "select * from test order by key", ItTableRaftSnapshotsTest::readRows)
+                .stream().map(IgniteBiTuple::get1).collect(toList());
+
+        assertThat(keys, equalTo(IntStream.rangeClosed(1, lastLoadedKey.get()).boxed().collect(toList())));
+    }
+
+    /**
+     * Tests that, after a node gets a RAFT snapshot installed to it, and it switches to a leader, it can act as a leader
+     * (and can install a RAFT snapshot on the ex-leader).
+     */
+    @Test
+    // TODO: IGNITE-18423 - enable when ReplicationTimeoutException is fixed
+    @Disabled("IGNITE-18423")
+    void nodeCanInstallSnapshotsAfterSnapshotInstalledToIt() throws Exception {
+        feedNode2WithSnapshotOfOneRow(DEFAULT_KNOCKOUT);
+
+        // The leader (0) has fed the follower (2). Now, change roles: the new leader will be node 2, it will feed node 0.
+
+        transferLeadershipOnSolePartitionTo(2);
+
+        cluster.knockOutNode(0, DEFAULT_KNOCKOUT);
+
+        cluster.doInSession(2, session -> {
+            executeUpdate("insert into test(key, value) values (2, 'two')", session);
+        });
+
+        // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
+        causeLogTruncationOnSolePartitionLeader();
+
+        reanimateNodeAndWaitForSnapshotInstalled(0, DEFAULT_KNOCKOUT);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(0, "select * from test order by key",
+                ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+    }
+
+    /**
+     * Tests that, if a snapshot installation fails for some reason, a subsequent retry due to a timeout happens successfully.
+     */
+    @Test
+    void snapshotInstallationRepeatsOnTimeout() throws Exception {
+        prepareClusterForInstallingSnapshotToNode2(DEFAULT_KNOCKOUT, DEFAULT_STORAGE_ENGINE, theCluster -> {
+            theCluster.node(0).dropMessages(dropFirstSnapshotMetaResponse());
+        });
+
+        reanimateNode2AndWaitForSnapshotInstalled(DEFAULT_KNOCKOUT);
+    }
+
+    private BiPredicate<String, NetworkMessage> dropFirstSnapshotMetaResponse() {
+        AtomicBoolean sentSnapshotMetaResponse = new AtomicBoolean(false);
+
+        return dropFirstSnapshotMetaResponse(
+                sentSnapshotMetaResponse);
+    }
+
+    private BiPredicate<String, NetworkMessage> dropFirstSnapshotMetaResponse(AtomicBoolean sentSnapshotMetaResponse) {
+        return (targetConsistentId, message) -> {
+            if (Objects.equals(targetConsistentId, cluster.node(2).name()) && message instanceof SnapshotMetaResponse) {
+                return sentSnapshotMetaResponse.compareAndSet(false, true);
+            } else {
+                return false;
+            }
+        };
+    }
+
+    /**
+     * This is a test for a tricky scenario:
+     *
+     * <ol>
+     *     <li>First InstallSnapshot request is sent, its processing starts hanging forever (it will be cancelled on step 3</li>
+     *     <li>After a timeout, second InstallSnapshot request is sent with same index+term as the first had; in JRaft, it causes
+     *     a special handling (previous request processing is NOT cancelled)</li>
+     *     <li>After a timeout, third InstallSnapshot request is sent with DIFFERENT index, so it cancels the first snapshot processing
+     *     effectively unblocking the first thread</li>
+     * </ol>
+     *
+     * <p>In the original JRaft implementation, after being unblocked, the first thread fails to clean up, so subsequent retries will
+     * always see a phantom of an unfinished snapshot, so the snapshotting process will be jammed. Also, node stop might
+     * stuck because one 'download' task will remain unfinished forever.
+     */
+    @Test
+    void snapshotInstallTimeoutDoesNotBreakSubsequentInstallsWhenSecondAttemptIsIdenticalToFirst() throws Exception {
+        AtomicBoolean snapshotInstallFailedDueToIdenticalRetry = new AtomicBoolean(false);
+
+        Logger snapshotExecutorLogger = Logger.getLogger(SnapshotExecutorImpl.class.getName());
+
+        var snapshotInstallFailedDueToIdenticalRetryHandler = new NoOpHandler() {
+            @Override
+            public void publish(LogRecord record) {
+                if (record.getMessage().contains("Register DownloadingSnapshot failed: interrupted by retry installing request")) {
+                    snapshotInstallFailedDueToIdenticalRetry.set(true);
+                }
+            }
+        };
+
+        snapshotExecutorLogger.addHandler(snapshotInstallFailedDueToIdenticalRetryHandler);
+
+        try {
+            prepareClusterForInstallingSnapshotToNode2(DEFAULT_KNOCKOUT, DEFAULT_STORAGE_ENGINE, theCluster -> {
+                BiPredicate<String, NetworkMessage> dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed = (recipientId, message) ->
+                        message instanceof ActionRequest
+                                && ((ActionRequest) message).command() instanceof SafeTimeSyncCommand
+                                && !snapshotInstallFailedDueToIdenticalRetry.get();
+
+                theCluster.node(0).dropMessages(
+                        dropFirstSnapshotMetaResponse().or(dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed)
+                );
+
+                theCluster.node(1).dropMessages(dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed);
+                theCluster.node(2).dropMessages(dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed);
+            });
+
+            reanimateNode2AndWaitForSnapshotInstalled(DEFAULT_KNOCKOUT);
+        } finally {
+            snapshotExecutorLogger.removeHandler(snapshotInstallFailedDueToIdenticalRetryHandler);
+        }
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 6d5a560e48..5d13c09b4d 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -822,11 +822,20 @@ public class IgniteImpl implements Ignite {
         return logicalTopologyService;
     }
 
+    // TODO: IGNITE-18493 - remove/move this
     @TestOnly
     public void dropMessages(BiPredicate<String, NetworkMessage> predicate) {
         ((DefaultMessagingService) clusterSvc.messagingService()).dropMessages(predicate);
     }
 
+    // TODO: IGNITE-18493 - remove/move this
+    @TestOnly
+    @Nullable
+    public BiPredicate<String, NetworkMessage> dropMessagesPredicate() {
+        return ((DefaultMessagingService) clusterSvc.messagingService()).dropMessagesPredicate();
+    }
+
+    // TODO: IGNITE-18493 - remove/move this
     @TestOnly
     public void stopDroppingMessages() {
         ((DefaultMessagingService) clusterSvc.messagingService()).stopDroppingMessages();
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index a26ed44a2f..e999e522b7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -78,7 +78,9 @@ public class PartitionAccessImpl implements PartitionAccess {
     public CompletableFuture<MvPartitionStorage> reCreateMvPartitionStorage() throws StorageException {
         assert mvTableStorage.getMvPartition(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-        return mvTableStorage.destroyPartition(partId())
+        // TODO: IGNITE-18030 - actually recreate or do in a different way
+        //return mvTableStorage.destroyPartition(partId())
+        return CompletableFuture.completedFuture(null)
                 .thenApply(unused -> mvTableStorage.getOrCreateMvPartition(partId()));
     }
 
@@ -86,7 +88,8 @@ public class PartitionAccessImpl implements PartitionAccess {
     public TxStateStorage reCreateTxStatePartitionStorage() throws StorageException {
         assert txStateTableStorage.getTxStateStorage(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-        txStateTableStorage.destroyTxStateStorage(partId());
+        // TODO: IGNITE-18030 - actually recreate or do in a different way
+        //txStateTableStorage.destroyTxStateStorage(partId());
 
         return txStateTableStorage.getOrCreateTxStateStorage(partId());
     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index efaade973b..24224f92c5 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 import java.io.IOException;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
@@ -30,6 +32,8 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
  * {@link SnapshotReader} implementation for reading local rebalance snapshot.
  */
 public class OutgoingSnapshotReader extends SnapshotReader {
+    private static final IgniteLogger LOG = Loggers.forClass(OutgoingSnapshotReader.class);
+
     /** Snapshot id. */
     private final UUID id = UUID.randomUUID();
 
@@ -48,6 +52,8 @@ public class OutgoingSnapshotReader extends SnapshotReader {
 
         snapshot = new OutgoingSnapshot(id, snapshotStorage.partition());
 
+        LOG.info("Starting snapshot reader for snapshot {}", id);
+
         snapshotStorage.outgoingSnapshotsManager().startOutgoingSnapshot(id, snapshot);
     }
 
@@ -65,6 +71,8 @@ public class OutgoingSnapshotReader extends SnapshotReader {
 
     @Override
     public void close() throws IOException {
+        LOG.info("Closing snapshot reader for snapshot {}", id);
+
         snapshotStorage.outgoingSnapshotsManager().finishOutgoingSnapshot(id);
     }
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 06de01ef24..673a71369c 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -188,10 +188,12 @@ public class IncomingSnapshotCopierTest {
         assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
         assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
 
-        verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
+        // TODO: IGNITE-18030 - uncomment the following line or remove it if not needed after the rework
+        //verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
         verify(incomingMvTableStorage, times(2)).getOrCreateMvPartition(eq(TEST_PARTITION));
 
-        verify(incomingTxStateTableStorage, times(1)).destroyTxStateStorage(eq(TEST_PARTITION));
+        // TODO: IGNITE-18030 - uncomment the following line or remove it if not needed after the rework
+        //verify(incomingTxStateTableStorage, times(1)).destroyTxStateStorage(eq(TEST_PARTITION));
         verify(incomingTxStateTableStorage, times(2)).getOrCreateTxStateStorage(eq(TEST_PARTITION));
     }