You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2022/10/27 12:36:25 UTC

[ignite-3] branch main updated: IGNITE-17894 Implement RAFT snapshot streaming receiver (#1233)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 93914175c5 IGNITE-17894 Implement RAFT snapshot streaming receiver (#1233)
93914175c5 is described below

commit 93914175c5ce1c55d123c6a2225ff861eee2f8f5
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Oct 27 15:36:20 2022 +0300

    IGNITE-17894 Implement RAFT snapshot streaming receiver (#1233)
---
 .../org/apache/ignite/internal/util/Cursor.java    |  11 +
 .../org/apache/ignite/internal/storage/RowId.java  |  30 +-
 .../internal/storage/engine/MvTableStorage.java    |   2 +-
 .../storage/impl/TestMvPartitionStorage.java       |   4 +-
 .../internal/storage/impl/TestMvTableStorage.java  |   4 +-
 .../pagememory/AbstractPageMemoryTableStorage.java |   5 +-
 .../storage/rocksdb/RocksDbTableStorage.java       |  36 +-
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |   6 +-
 .../internal/table/distributed/TableManager.java   | 302 ++++++++++------
 .../distributed/raft/snapshot/PartitionAccess.java |  35 +-
 .../raft/snapshot/PartitionAccessImpl.java         |  82 +++--
 .../distributed/raft/snapshot/PartitionKey.java    |   5 +-
 .../raft/snapshot/PartitionSnapshotStorage.java    |  26 +-
 .../snapshot/PartitionSnapshotStorageFactory.java  |  19 +-
 .../distributed/raft/snapshot/SnapshotUri.java     |   4 +-
 .../snapshot/incoming/IncomingSnapshotCopier.java  | 391 ++++++++++++++++++---
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   |  12 +-
 .../snapshot/outgoing/OutgoingSnapshotReader.java  |   2 +-
 .../raft/snapshot/PartitionAccessImplTest.java     |  86 -----
 .../incoming/IncomingSnapshotCopierTest.java       | 382 ++++++++++++++++++++
 .../snapshot/outgoing/OutgoingSnapshotTest.java    |  69 ++--
 .../internal/tx/storage/state/TxStateStorage.java  |   5 +
 .../tx/storage/state/TxStateTableStorage.java      |   4 +-
 .../state/rocksdb/TxStateRocksDbStorage.java       |  30 +-
 .../state/rocksdb/TxStateRocksDbTableStorage.java  |  52 ++-
 .../test/TestConcurrentHashMapTxStateStorage.java  |  21 +-
 .../TestConcurrentHashMapTxStateTableStorage.java  |  36 +-
 27 files changed, 1183 insertions(+), 478 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
index e4a6f53a07..b08798ddcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
@@ -64,6 +64,17 @@ public interface Cursor<T> extends Iterator<T>, Iterable<T>, AutoCloseable {
         };
     }
 
+    /**
+     * Creates an iterable based cursor.
+     *
+     * @param iterable Iterable.
+     * @param <T> Type of elements.
+     * @return Cursor.
+     */
+    static <T> Cursor<T> fromIterable(Iterable<? extends T> iterable) {
+        return fromIterator(iterable.iterator());
+    }
+
     /**
      * Returns a sequential Stream over the elements covered by this cursor.
      *
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
index 25dd5c4323..e38e3759fa 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowId.java
@@ -23,15 +23,15 @@ import org.apache.ignite.internal.tx.Timestamp;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Class that represents row id in primary index of the table. Contains a timestamp-based UUID and a partition id.
+ * Class that represents row ID in primary index of the table. Contains a timestamp-based UUID and a partition ID.
  *
  * @see MvPartitionStorage
  */
 public final class RowId implements Serializable, Comparable<RowId> {
-    /** Partition id. Short type reduces payload when transferring an object over network. */
+    /** Partition ID. Short type reduces payload when transferring an object over network. */
     private final short partitionId;
 
-    /** Unique id. */
+    /** Unique ID. */
     private final UUID uuid;
 
     public static RowId lowestRowId(int partitionId) {
@@ -39,9 +39,9 @@ public final class RowId implements Serializable, Comparable<RowId> {
     }
 
     /**
-     * Create a row id with the UUID value based on {@link Timestamp}.
+     * Create a row ID with the UUID value based on {@link Timestamp}.
      *
-     * @param partitionId Partition id.
+     * @param partitionId Partition ID.
      */
     public RowId(int partitionId) {
         this(partitionId, Timestamp.nextVersion().toUuid());
@@ -50,7 +50,7 @@ public final class RowId implements Serializable, Comparable<RowId> {
     /**
      * Constructor.
      *
-     * @param partitionId Partition id.
+     * @param partitionId Partition ID.
      * @param mostSignificantBits UUID's most significant bits.
      * @param leastSignificantBits UUID's least significant bits.
      */
@@ -58,27 +58,33 @@ public final class RowId implements Serializable, Comparable<RowId> {
         this(partitionId, new UUID(mostSignificantBits, leastSignificantBits));
     }
 
-    private RowId(int partitionId, UUID uuid) {
+    /**
+     * Constructor.
+     *
+     * @param partitionId Partition ID.
+     * @param uuid UUID.
+     */
+    public RowId(int partitionId, UUID uuid) {
         this.partitionId = (short) partitionId;
         this.uuid = uuid;
     }
 
     /**
-     * Returns a partition id for current row id.
+     * Returns a partition ID for current row ID.
      */
     public int partitionId() {
         return partitionId & 0xFFFF;
     }
 
     /**
-     * Returns the most significant 64 bits of row id's UUID.
+     * Returns the most significant 64 bits of row ID's UUID.
      */
     public long mostSignificantBits() {
         return uuid.getMostSignificantBits();
     }
 
     /**
-     * Returns the least significant 64 bits of row id's UUID.
+     * Returns the least significant 64 bits of row ID's UUID.
      */
     public long leastSignificantBits() {
         return uuid.getLeastSignificantBits();
@@ -86,8 +92,6 @@ public final class RowId implements Serializable, Comparable<RowId> {
 
     /**
      * Returns the UUID equivalent of {@link #mostSignificantBits()} and {@link #leastSignificantBits()}.
-     *
-     * @return UUID.
      */
     public UUID uuid() {
         return uuid;
@@ -131,7 +135,7 @@ public final class RowId implements Serializable, Comparable<RowId> {
     }
 
     /**
-     * Returns the next row id withing a single partition, or {@code null} if current row id already has maximal possible value.
+     * Returns the next row ID withing a single partition, or {@code null} if current row ID already has maximal possible value.
      */
     public @Nullable RowId increment() {
         long lsb = uuid.getLeastSignificantBits() + 1;
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index d5b06a103c..386c7db7b9 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -63,7 +63,7 @@ public interface MvTableStorage {
      * @throws IllegalArgumentException If Partition ID is out of bounds.
      * @throws StorageException If an error has occurred during the partition destruction.
      */
-    CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException;
+    void destroyPartition(int partitionId) throws StorageException;
 
     /**
      * Returns an already created Index (either Sorted or Hash) with the given name or creates a new one if it does not exist.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 50e27bd297..6a01278bb4 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable;
 public class TestMvPartitionStorage implements MvPartitionStorage {
     private final ConcurrentNavigableMap<RowId, VersionChain> map = new ConcurrentSkipListMap<>();
 
-    private long lastAppliedIndex = 0;
+    private volatile long lastAppliedIndex;
 
     private final int partitionId;
 
@@ -106,8 +106,6 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
     /** {@inheritDoc} */
     @Override
     public void lastAppliedIndex(long lastAppliedIndex) throws StorageException {
-        assert lastAppliedIndex > this.lastAppliedIndex : "current=" + this.lastAppliedIndex + ", new=" + lastAppliedIndex;
-
         this.lastAppliedIndex = lastAppliedIndex;
     }
 
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 27a7c94a8c..9293496fad 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -100,15 +100,13 @@ public class TestMvTableStorage implements MvTableStorage {
     }
 
     @Override
-    public CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException {
+    public void destroyPartition(int partitionId) throws StorageException {
         Integer boxedPartitionId = partitionId;
 
         partitions.remove(boxedPartitionId);
 
         sortedIndicesById.values().forEach(indices -> indices.storageByPartitionId.remove(boxedPartitionId));
         hashIndicesById.values().forEach(indices -> indices.storageByPartitionId.remove(boxedPartitionId));
-
-        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 76db11640e..969a798710 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -130,7 +130,7 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
     }
 
     @Override
-    public CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException {
+    public void destroyPartition(int partitionId) throws StorageException {
         assert started : "Storage has not started yet";
 
         MvPartitionStorage partition = getMvPartition(partitionId);
@@ -141,9 +141,6 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
             // TODO: IGNITE-17197 Actually destroy the partition.
             //partition.destroy();
         }
-
-        // TODO: IGNITE-17197 Convert this to true async code.
-        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 58b1530379..1ba1f0763c 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -173,7 +173,6 @@ public class RocksDbTableStorage implements MvTableStorage {
         return meta.columnFamily().handle();
     }
 
-    /** {@inheritDoc} */
     @Override
     public TableConfiguration configuration() {
         return tableCfg;
@@ -184,7 +183,6 @@ public class RocksDbTableStorage implements MvTableStorage {
         return tablesCfg;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void start() throws StorageException {
         flusher = new RocksDbFlusher(
@@ -306,7 +304,6 @@ public class RocksDbTableStorage implements MvTableStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public void stop() throws StorageException {
         if (!stopGuard.compareAndSet(false, true)) {
@@ -345,7 +342,6 @@ public class RocksDbTableStorage implements MvTableStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public void destroy() throws StorageException {
         stop();
@@ -353,7 +349,6 @@ public class RocksDbTableStorage implements MvTableStorage {
         IgniteUtils.deleteIfExists(tablePath);
     }
 
-    /** {@inheritDoc} */
     @Override
     public RocksDbMvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
         RocksDbMvPartitionStorage partition = getMvPartition(partitionId);
@@ -371,7 +366,6 @@ public class RocksDbTableStorage implements MvTableStorage {
         return partition;
     }
 
-    /** {@inheritDoc} */
     @Override
     public @Nullable RocksDbMvPartitionStorage getMvPartition(int partitionId) {
         checkPartitionId(partitionId);
@@ -379,32 +373,24 @@ public class RocksDbTableStorage implements MvTableStorage {
         return partitions.get(partitionId);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> destroyPartition(int partitionId) throws StorageException {
+    public void destroyPartition(int partitionId) throws StorageException {
         checkPartitionId(partitionId);
 
         RocksDbMvPartitionStorage mvPartition = partitions.getAndSet(partitionId, null);
 
-        if (mvPartition == null) {
-            return CompletableFuture.completedFuture(null);
-        }
+        if (mvPartition != null) {
+            //TODO IGNITE-17626 Destroy indexes as well...
+            mvPartition.destroy();
 
-        //TODO IGNITE-17626 Destroy indexes as well...
-        mvPartition.destroy();
-
-        // Wait for the data to actually be removed from the disk and close the storage.
-        return awaitFlush(false)
-                .whenComplete((v, e) -> {
-                    try {
-                        mvPartition.close();
-                    } catch (Exception ex) {
-                        LOG.error("Error when closing partition storage for partId = {}", ex, partitionId);
-                    }
-                });
+            try {
+                mvPartition.close();
+            } catch (Exception e) {
+                throw new StorageException("Error when closing partition storage for the partition: " + partitionId, e);
+            }
+        }
     }
 
-    /** {@inheritDoc} */
     @Override
     public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
         SortedIndex storages = sortedIndices.computeIfAbsent(indexId, this::createSortedIndex);
@@ -452,7 +438,6 @@ public class RocksDbTableStorage implements MvTableStorage {
         return storages.getOrCreateStorage(partitionStorage);
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> destroyIndex(UUID indexId) {
         HashIndex hashIdx = hashIndices.remove(indexId);
@@ -479,7 +464,6 @@ public class RocksDbTableStorage implements MvTableStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean isVolatile() {
         return false;
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 038404585b..b6a54467de 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -27,7 +26,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.nio.file.Path;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -108,13 +106,11 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
 
         partitionStorage1.runConsistently(() -> partitionStorage1.addWrite(rowId1, testData, txId, UUID.randomUUID(), 0));
 
-        CompletableFuture<Void> destroyFuture = tableStorage.destroyPartition(PARTITION_ID_0);
+        tableStorage.destroyPartition(PARTITION_ID_0);
 
         // Partition destruction doesn't enforce flush.
         ((RocksDbTableStorage) tableStorage).awaitFlush(true);
 
-        assertThat(destroyFuture, willCompleteSuccessfully());
-
         assertThat(tableStorage.getMvPartition(PARTITION_ID_0), is(nullValue()));
         assertThat(tableStorage.getOrCreateMvPartition(PARTITION_ID_0).read(rowId0, HybridTimestamp.MAX_VALUE).binaryRow(),
                 is(nullValue()));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b892fb396a..60f6bda20e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -131,6 +131,7 @@ import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -169,6 +170,14 @@ import org.jetbrains.annotations.TestOnly;
  */
 public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal,
         IgniteComponent {
+    /**
+     * The special value of the last applied index to indicate the beginning of a full data rebalancing.
+     *
+     * @see MvPartitionStorage#lastAppliedIndex()
+     * @see TxStateStorage#lastAppliedIndex()
+     */
+    public static final long FULL_RABALANCING_STARTED = -1;
+
     private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
 
     // TODO get rid of this in future? IGNITE-17307
@@ -286,6 +295,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Assignment change event listeners. */
     private final CopyOnWriteArrayList<Consumer<IgniteTablesInternal>> assignmentsChangeListeners = new CopyOnWriteArrayList<>();
 
+    /** Incoming RAFT snapshots executor. */
+    private final ExecutorService incomingSnapshotsExecutor;
+
     /** Rebalance scheduler pool size. */
     private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
 
@@ -413,6 +425,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, "tableManager-io", LOG));
+
+        incomingSnapshotsExecutor = new ThreadPoolExecutor(
+                Utils.cpus(),
+                Utils.cpus(),
+                100,
+                TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(),
+                NamedThreadFactory.create(nodeName, "incoming-raft-snapshot", LOG)
+        );
     }
 
     /** {@inheritDoc} */
@@ -680,10 +701,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 MvTableStorage storage = internalTbl.storage();
                 boolean isInMemory = storage.isVolatile();
 
-                // TODO: IGNITE-17197 Remove assert after the ticket is resolved.
-                assert internalTbl.storage() instanceof MvTableStorage :
-                        "Only multi version storages are supported. Current storage is a " + internalTbl.storage().getClass().getName();
-
                 // start new nodes, only if it is table creation
                 // other cases will be covered by rebalance logic
                 Set<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptySet();
@@ -697,10 +714,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
 
                 if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
-                    startGroupFut = CompletableFuture
-                            .supplyAsync(() -> getOrCreateMvPartition(internalTbl, partId), ioExecutor)
-                            .thenComposeAsync((partitionStorage) -> {
-                                boolean hasData = partitionStorage.lastAppliedIndex() > 0;
+                    startGroupFut = CompletableFuture.supplyAsync(() -> getOrCreateMvPartition(internalTbl.storage(), partId), ioExecutor)
+                            .thenComposeAsync(mvPartitionStorage -> {
+                                boolean hasData = mvPartitionStorage.lastAppliedIndex() > 0;
 
                                 CompletableFuture<Boolean> fut;
 
@@ -732,42 +748,52 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                     fut = CompletableFuture.completedFuture(true);
                                 }
 
-                                return fut.thenComposeAsync(startGroup -> {
+                                return fut.thenCompose(startGroup -> {
                                     if (!startGroup) {
                                         return CompletableFuture.completedFuture(null);
                                     }
 
-                                    RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, partId, tblCfg, partitionStorage,
-                                            newPartAssignment);
-
-                                    try {
-                                        raftMgr.startRaftGroupNode(
-                                                replicaGrpId,
-                                                newPartAssignment,
-                                                new PartitionListener(
-                                                        partitionDataStorage(partitionStorage, internalTbl, partId),
-                                                        internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
-                                                        txManager,
-                                                        primaryIndex
-                                                ),
-                                                new RebalanceRaftGroupEventsListener(
-                                                        metaStorageMgr,
-                                                        tablesCfg.tables().get(tablesById.get(tblId).name()),
-                                                        replicaGrpId,
-                                                        partId,
-                                                        busyLock,
-                                                        movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
-                                                        this::calculateAssignments,
-                                                        rebalanceScheduler
-                                                ),
-                                                groupOptions
-                                        );
-
-                                        return CompletableFuture.completedFuture(null);
-                                    } catch (NodeStoppingException ex) {
-                                        return CompletableFuture.failedFuture(ex);
-                                    }
-                                }, ioExecutor);
+                                    return CompletableFuture.supplyAsync(
+                                                    () -> getOrCreateTxStatePartitionStorage(internalTbl.txStateStorage(), partId),
+                                                    ioExecutor
+                                            )
+                                            .thenComposeAsync(txStatePartitionStorage -> {
+                                                RaftGroupOptions groupOptions = groupOptionsForPartition(
+                                                        internalTbl.storage(),
+                                                        internalTbl.txStateStorage(),
+                                                        partitionKey(internalTbl, partId),
+                                                        newPartAssignment
+                                                );
+
+                                                try {
+                                                    raftMgr.startRaftGroupNode(
+                                                            replicaGrpId,
+                                                            newPartAssignment,
+                                                            new PartitionListener(
+                                                                    partitionDataStorage(mvPartitionStorage, internalTbl, partId),
+                                                                    txStatePartitionStorage,
+                                                                    txManager,
+                                                                    primaryIndex
+                                                            ),
+                                                            new RebalanceRaftGroupEventsListener(
+                                                                    metaStorageMgr,
+                                                                    tablesCfg.tables().get(tablesById.get(tblId).name()),
+                                                                    replicaGrpId,
+                                                                    partId,
+                                                                    busyLock,
+                                                                    movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
+                                                                    this::calculateAssignments,
+                                                                    rebalanceScheduler
+                                                            ),
+                                                            groupOptions
+                                                    );
+
+                                                    return CompletableFuture.completedFuture(null);
+                                                } catch (NodeStoppingException ex) {
+                                                    return CompletableFuture.failedFuture(ex);
+                                                }
+                                            }, ioExecutor);
+                                });
                             }, ioExecutor);
                 }
 
@@ -779,36 +805,47 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                 return CompletableFuture.failedFuture(ex);
                             }
                         }, ioExecutor)
-                        .thenAccept(
-                                updatedRaftGroupService -> {
-                                    ((InternalTableImpl) internalTbl)
-                                            .updateInternalTableRaftGroupService(partId, updatedRaftGroupService);
-
-                                    if (replicaMgr.shouldHaveReplicationGroupLocally(nodes)) {
-                                        MvPartitionStorage partitionStorage = getOrCreateMvPartition(internalTbl, partId);
-
-                                        try {
-                                            replicaMgr.startReplica(replicaGrpId,
-                                                    new PartitionReplicaListener(
-                                                            partitionStorage,
-                                                            updatedRaftGroupService,
-                                                            txManager,
-                                                            lockMgr,
-                                                            partId,
-                                                            tblId,
-                                                            primaryIndex,
-                                                            clock,
-                                                            internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
-                                                            topologyService,
-                                                            placementDriver
-                                                    )
-                                            );
-                                        } catch (NodeStoppingException ex) {
-                                            throw new AssertionError("Loza was stopped before Table manager", ex);
-                                        }
-                                    }
-                                }
-                        ).exceptionally(th -> {
+                        .thenCompose(updatedRaftGroupService -> {
+                            ((InternalTableImpl) internalTbl).updateInternalTableRaftGroupService(partId, updatedRaftGroupService);
+
+                            if (replicaMgr.shouldHaveReplicationGroupLocally(nodes)) {
+                                return CompletableFuture.supplyAsync(
+                                        () -> getOrCreateMvPartition(internalTbl.storage(), partId),
+                                                ioExecutor
+                                        )
+                                        .thenCombine(
+                                                CompletableFuture.supplyAsync(
+                                                        () -> getOrCreateTxStatePartitionStorage(internalTbl.txStateStorage(), partId),
+                                                        ioExecutor
+                                                ),
+                                                (mvPartitionStorage, txStatePartitionStorage) -> {
+                                                    try {
+                                                        replicaMgr.startReplica(replicaGrpId,
+                                                                new PartitionReplicaListener(
+                                                                        mvPartitionStorage,
+                                                                        updatedRaftGroupService,
+                                                                        txManager,
+                                                                        lockMgr,
+                                                                        partId,
+                                                                        tblId,
+                                                                        primaryIndex,
+                                                                        clock,
+                                                                        txStatePartitionStorage,
+                                                                        topologyService,
+                                                                        placementDriver
+                                                                )
+                                                        );
+                                                    } catch (NodeStoppingException ex) {
+                                                        throw new AssertionError("Loza was stopped before Table manager", ex);
+                                                    }
+
+                                                    return null;
+                                                });
+                            } else {
+                                return CompletableFuture.completedFuture(null);
+                            }
+                        })
+                        .exceptionally(th -> {
                             LOG.warn("Unable to update raft groups on the node", th);
 
                             return null;
@@ -821,18 +858,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         CompletableFuture.allOf(futures).join();
     }
 
-    private PartitionDataStorage partitionDataStorage(MvPartitionStorage partitionStorage, InternalTable internalTbl, int partId) {
-        return new SnapshotAwarePartitionDataStorage(partitionStorage, outgoingSnapshotsManager, partitionKey(internalTbl, partId));
-    }
-
-    private PartitionKey partitionKey(InternalTable internalTbl, int partId) {
-        return new PartitionKey(internalTbl.tableId(), partId);
-    }
-
-    private static MvPartitionStorage getOrCreateMvPartition(InternalTable internalTbl, int partId) {
-        return internalTbl.storage().getOrCreateMvPartition(partId);
-    }
-
     /**
      * Calculates the quantity of the data nodes for the partition of the table.
      *
@@ -860,15 +885,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     }
 
     private RaftGroupOptions groupOptionsForPartition(
-            InternalTable internalTbl,
-            int partId,
-            ExtendedTableConfiguration tableConfig,
-            MvPartitionStorage partitionStorage,
+            MvTableStorage mvTableStorage,
+            TxStateTableStorage txStateTableStorage,
+            PartitionKey partitionKey,
             Set<ClusterNode> peers
     ) {
         RaftGroupOptions raftGroupOptions;
 
-        if (internalTbl.storage().isVolatile()) {
+        if (mvTableStorage.isVolatile()) {
             raftGroupOptions = RaftGroupOptions.forVolatileStores()
                     .setLogStorageFactory(volatileLogStorageFactoryCreator.factory(raftMgr.volatileRaft().logStorage().value()))
                     .raftMetaStorageFactory((groupId, raftOptions) -> new VolatileRaftMetaStorage());
@@ -881,9 +905,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 raftMgr.topologyService(),
                 //TODO IGNITE-17302 Use miniumum from mv storage and tx state storage.
                 outgoingSnapshotsManager,
-                new PartitionAccessImpl(partitionKey(internalTbl, partId), partitionStorage),
+                new PartitionAccessImpl(partitionKey, mvTableStorage, txStateTableStorage),
                 peers.stream().map(n -> new Peer(n.address())).map(PeerId::fromPeer).map(Object::toString).collect(Collectors.toList()),
-                List.of()
+                List.of(),
+                incomingSnapshotsExecutor
         ));
 
         return raftGroupOptions;
@@ -910,6 +935,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
         shutdownAndAwaitTermination(txStateStoragePool, 10, TimeUnit.SECONDS);
         shutdownAndAwaitTermination(txStateStorageScheduledPool, 10, TimeUnit.SECONDS);
+        shutdownAndAwaitTermination(incomingSnapshotsExecutor, 10, TimeUnit.SECONDS);
     }
 
     /**
@@ -1656,11 +1682,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                     ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration) tablesCfg.tables().get(tbl.name());
 
-                    // TODO: IGNITE-17197 Remove assert after the ticket is resolved.
-                    assert tbl.internalTable().storage() instanceof MvTableStorage :
-                            "Only multi version storages are supported. Current storage is a "
-                                    + tbl.internalTable().storage().getClass().getName();
-
                     // Stable assignments from the meta store, which revision is bounded by the current pending event.
                     byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
                             pendingAssignmentsWatchEvent.revision()).join().value();
@@ -1680,25 +1701,31 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                     ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
 
+                    InternalTable internalTable = tbl.internalTable();
+
                     try {
                         LOG.info("Received update on pending assignments. Check if new raft group should be started"
                                         + " [key={}, partition={}, table={}, localMemberAddress={}]",
                                 pendingAssignmentsWatchEvent.key(), partId, tbl.name(), localMember.address());
 
                         if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
-                            MvPartitionStorage partitionStorage = getOrCreateMvPartition(tbl.internalTable(), partId);
+                            MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(internalTable.storage(), partId);
+
+                            TxStateStorage txStatePartitionStorage = getOrCreateTxStatePartitionStorage(
+                                    internalTable.txStateStorage(),
+                                    partId
+                            );
 
                             RaftGroupOptions groupOptions = groupOptionsForPartition(
-                                    tbl.internalTable(),
-                                    partId,
-                                    tblCfg,
-                                    partitionStorage,
+                                    internalTable.storage(),
+                                    internalTable.txStateStorage(),
+                                    partitionKey(internalTable, partId),
                                     assignments
                             );
 
                             RaftGroupListener raftGrpLsnr = new PartitionListener(
-                                    partitionDataStorage(partitionStorage, tbl.internalTable(), partId),
-                                    tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
+                                    partitionDataStorage(mvPartitionStorage, internalTable, partId),
+                                    txStatePartitionStorage,
                                     txManager,
                                     primaryIndex
                             );
@@ -1709,7 +1736,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                     replicaGrpId,
                                     partId,
                                     busyLock,
-                                    movePartition(() -> tbl.internalTable().partitionRaftGroupService(partId)),
+                                    movePartition(() -> internalTable.partitionRaftGroupService(partId)),
                                     TableManager.this::calculateAssignments,
                                     rebalanceScheduler
                             );
@@ -1724,19 +1751,24 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         }
 
                         if (replicaMgr.shouldHaveReplicationGroupLocally(deltaPeers)) {
-                            MvPartitionStorage partitionStorage = getOrCreateMvPartition(tbl.internalTable(), partId);
+                            MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(internalTable.storage(), partId);
+
+                            TxStateStorage txStatePartitionStorage = getOrCreateTxStatePartitionStorage(
+                                    internalTable.txStateStorage(),
+                                    partId
+                            );
 
                             replicaMgr.startReplica(replicaGrpId,
                                     new PartitionReplicaListener(
-                                            partitionStorage,
-                                            tbl.internalTable().partitionRaftGroupService(partId),
+                                            mvPartitionStorage,
+                                            internalTable.partitionRaftGroupService(partId),
                                             txManager,
                                             lockMgr,
                                             partId,
                                             tblId,
                                             primaryIndex,
                                             clock,
-                                            tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
+                                            txStatePartitionStorage,
                                             raftMgr.topologyService(),
                                             placementDriver
                                     )
@@ -1754,7 +1786,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                     var newNodes = newPeers.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
 
-                    RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(partId);
+                    RaftGroupService partGrpSvc = internalTable.partitionRaftGroupService(partId);
 
                     IgniteBiTuple<Peer, Long> leaderWithTerm = partGrpSvc.refreshAndGetLeaderWithTerm().join();
 
@@ -1922,4 +1954,60 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     private <T extends ConfigurationProperty<?>> T directProxy(T property) {
         return getMetadataLocallyOnly ? property : ConfigurationUtil.directProxy(property);
     }
+
+    private PartitionDataStorage partitionDataStorage(MvPartitionStorage partitionStorage, InternalTable internalTbl, int partId) {
+        return new SnapshotAwarePartitionDataStorage(partitionStorage, outgoingSnapshotsManager, partitionKey(internalTbl, partId));
+    }
+
+    private PartitionKey partitionKey(InternalTable internalTbl, int partId) {
+        return new PartitionKey(internalTbl.tableId(), partId);
+    }
+
+    /**
+     * Returns or creates multi-versioned partition storage.
+     *
+     * <p>If a full rebalance has not been completed for a partition, it will be recreated to remove any garbage that might have been left
+     * in when the rebalance was interrupted.
+     *
+     * @param mvTableStorage Multi-versioned table storage.
+     * @param partId Partition ID.
+     */
+    private static MvPartitionStorage getOrCreateMvPartition(MvTableStorage mvTableStorage, int partId) {
+        MvPartitionStorage mvPartitionStorage = mvTableStorage.getOrCreateMvPartition(partId);
+
+        // If a full rebalance did not happen, then we return the storage as is.
+        if (mvPartitionStorage.persistedIndex() != FULL_RABALANCING_STARTED) {
+            return mvPartitionStorage;
+        }
+
+        // A full rebalance was started but not completed, so the partition must be recreated to remove the garbage.
+        mvTableStorage.destroyPartition(partId);
+
+        return mvTableStorage.getOrCreateMvPartition(partId);
+    }
+
+    /**
+     * Returns or creates transaction state storage for a partition.
+     *
+     * <p>If a full rebalance has not been completed for a partition, it will be recreated to remove any garbage that might have been left
+     * in when the rebalance was interrupted.
+     *
+     * @param txStateTableStorage Transaction state storage for a table.
+     * @param partId Partition ID.
+     */
+    private static TxStateStorage getOrCreateTxStatePartitionStorage(
+            TxStateTableStorage txStateTableStorage,
+            int partId
+    ) {
+        TxStateStorage txStatePartitionStorage = txStateTableStorage.getOrCreateTxStateStorage(partId);
+
+        // If a full rebalance did not happen, then we return the storage as is.
+        if (txStatePartitionStorage.persistedIndex() != FULL_RABALANCING_STARTED) {
+            return txStatePartitionStorage;
+        }
+
+        txStateTableStorage.destroyTxStateStorage(partId);
+
+        return txStateTableStorage.getOrCreateTxStateStorage(partId);
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
index 1ab37e5bc8..a0abd1899d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -17,11 +17,9 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
-import java.util.List;
-import org.apache.ignite.internal.storage.ReadResult;
-import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 
 /**
  * Small abstractions for partition storages that includes only methods, mandatory for the snapshot storage.
@@ -29,31 +27,30 @@ import org.jetbrains.annotations.Nullable;
 public interface PartitionAccess {
     /**
      * Returns the key that uniquely identifies the corresponding partition.
-     *
-     * @return Partition key.
      */
-    PartitionKey key();
+    PartitionKey partitionKey();
+
+    /**
+     * Returns the multi-versioned partition storage.
+     */
+    MvPartitionStorage mvPartitionStorage();
 
     /**
-     * Returns persisted RAFT index for the partition.
+     * Returns transaction state storage for the partition.
      */
-    long persistedIndex();
+    TxStateStorage txStatePartitionStorage();
 
     /**
-     * Returns a row id, existing in the storage, that's greater or equal than the lower bound. {@code null} if not found.
+     * Destroys and recreates the multi-versioned partition storage.
      *
-     * @param lowerBound Lower bound.
-     * @throws StorageException If failed to read data from the storage.
+     * @throws StorageException If an error has occurred during the partition destruction.
      */
-    @Nullable
-    RowId closestRowId(RowId lowerBound);
+    MvPartitionStorage reCreateMvPartitionStorage() throws StorageException;
 
     /**
-     * Returns all versions of a row identified with the given {@link RowId}.
-     * The returned versions are in newest-to-oldest order.
+     * Destroys and recreates the multi-versioned partition storage.
      *
-     * @param rowId Id of the row.
-     * @return All versions of the row.
+     * @throws StorageException If an error has occurred during transaction state storage for the partition destruction.
      */
-    List<ReadResult> rowVersions(RowId rowId);
+    TxStateStorage reCreateTxStatePartitionStorage() throws StorageException;
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index cf20f2c9d8..d847f03095 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -17,55 +17,85 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.ReadResult;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.util.Cursor;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 
 /**
- * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}.
+ * {@link PartitionAccess} implementation.
  */
 public class PartitionAccessImpl implements PartitionAccess {
     private final PartitionKey partitionKey;
-    private final MvPartitionStorage partitionStorage;
 
-    public PartitionAccessImpl(PartitionKey partitionKey, MvPartitionStorage partitionStorage) {
+    private final MvTableStorage mvTableStorage;
+
+    private final TxStateTableStorage txStateTableStorage;
+
+    /**
+     * Constructor.
+     *
+     * @param partitionKey Partition key.
+     * @param mvTableStorage Multi version table storage.
+     * @param txStateTableStorage Table transaction state storage.
+     */
+    public PartitionAccessImpl(
+            PartitionKey partitionKey,
+            MvTableStorage mvTableStorage,
+            TxStateTableStorage txStateTableStorage
+    ) {
         this.partitionKey = partitionKey;
-        this.partitionStorage = partitionStorage;
+        this.mvTableStorage = mvTableStorage;
+        this.txStateTableStorage = txStateTableStorage;
     }
 
     @Override
-    public PartitionKey key() {
+    public PartitionKey partitionKey() {
         return partitionKey;
     }
 
     @Override
-    public long persistedIndex() {
-        return partitionStorage.persistedIndex();
+    public MvPartitionStorage mvPartitionStorage() {
+        MvPartitionStorage mvPartition = mvTableStorage.getMvPartition(partId());
+
+        assert mvPartition != null : "table=" + tableName() + ", part=" + partId();
+
+        return mvPartition;
+    }
+
+    @Override
+    public TxStateStorage txStatePartitionStorage() {
+        TxStateStorage txStatePartitionStorage = txStateTableStorage.getTxStateStorage(partId());
+
+        assert txStatePartitionStorage != null : "table=" + tableName() + ", part=" + partId();
+
+        return txStatePartitionStorage;
     }
 
     @Override
-    public @Nullable RowId closestRowId(RowId lowerBound) {
-        return partitionStorage.closestRowId(lowerBound);
+    public MvPartitionStorage reCreateMvPartitionStorage() throws StorageException {
+        assert mvTableStorage.getMvPartition(partId()) != null : "table=" + tableName() + ", part=" + partId();
+
+        mvTableStorage.destroyPartition(partId());
+
+        return mvTableStorage.getOrCreateMvPartition(partId());
     }
 
     @Override
-    public List<ReadResult> rowVersions(RowId rowId) {
-        try (Cursor<ReadResult> cursor = partitionStorage.scanVersions(rowId)) {
-            List<ReadResult> versions = new ArrayList<>();
+    public TxStateStorage reCreateTxStatePartitionStorage() throws StorageException {
+        assert txStateTableStorage.getTxStateStorage(partId()) != null : "table=" + tableName() + ", part=" + partId();
 
-            for (ReadResult version : cursor) {
-                versions.add(version);
-            }
+        txStateTableStorage.destroyTxStateStorage(partId());
 
-            return versions;
-        } catch (Exception e) {
-            // TODO: IGNITE-17935 - handle this
+        return txStateTableStorage.getOrCreateTxStateStorage(partId());
+    }
+
+    private int partId() {
+        return partitionKey.partitionId();
+    }
 
-            throw new RuntimeException(e);
-        }
+    private String tableName() {
+        return mvTableStorage.configuration().name().value();
     }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java
index dd914c39f1..683fa67fce 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java
@@ -26,12 +26,11 @@ import org.apache.ignite.internal.tostring.S;
  */
 public class PartitionKey {
     private final UUID tableId;
+
     private final int partitionId;
 
     /**
      * Returns ID of the table.
-     *
-     * @return ID of the table.
      */
     public UUID tableId() {
         return tableId;
@@ -39,8 +38,6 @@ public class PartitionKey {
 
     /**
      * Returns partition ID.
-     *
-     * @return Partition ID.
      */
     public int partitionId() {
         return partitionId;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
index 49315d430d..b9b73c9eca 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
@@ -58,6 +59,9 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
     /** Snapshot meta, constructed from the storage data and raft group configuration. */
     private final SnapshotMeta snapshotMeta;
 
+    /** Incoming snapshots executor. */
+    private final Executor incomingSnapshotsExecutor;
+
     /** Snapshot throttle instance. */
     @Nullable
     private SnapshotThrottle snapshotThrottle;
@@ -74,6 +78,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
      * @param raftOptions RAFT options.
      * @param partition Partition.
      * @param snapshotMeta Snapshot meta.
+     * @param incomingSnapshotsExecutor Incoming snapshots executor.
      */
     public PartitionSnapshotStorage(
             TopologyService topologyService,
@@ -81,7 +86,8 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
             String snapshotUri,
             RaftOptions raftOptions,
             PartitionAccess partition,
-            SnapshotMeta snapshotMeta
+            SnapshotMeta snapshotMeta,
+            Executor incomingSnapshotsExecutor
     ) {
         this.topologyService = topologyService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
@@ -89,6 +95,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
         this.raftOptions = raftOptions;
         this.partition = partition;
         this.snapshotMeta = snapshotMeta;
+        this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
     }
 
     /**
@@ -136,37 +143,39 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
     /**
      * Returns a snapshot throttle instance.
      */
-    public SnapshotThrottle snapshotThrottle() {
+    public @Nullable SnapshotThrottle snapshotThrottle() {
         return snapshotThrottle;
     }
 
-    /** {@inheritDoc} */
+    /**
+     * Returns the incoming snapshots executor.
+     */
+    public Executor getIncomingSnapshotsExecutor() {
+        return incomingSnapshotsExecutor;
+    }
+
     @Override
     public boolean init(Void opts) {
         // No-op.
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void shutdown() {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean setFilterBeforeCopyRemote() {
         // Option is not supported.
         return false;
     }
 
-    /** {@inheritDoc} */
     @Override
     public SnapshotWriter create() {
         return new PartitionSnapshotWriter(this);
     }
 
-    /** {@inheritDoc} */
     @Override
     public SnapshotReader open() {
         if (startupSnapshotOpened.compareAndSet(false, true)) {
@@ -176,13 +185,11 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
         return new OutgoingSnapshotReader(this);
     }
 
-    /** {@inheritDoc} */
     @Override
     public SnapshotReader copyFrom(String uri, SnapshotCopierOptions opts) {
         throw new UnsupportedOperationException("Synchronous snapshot copy is not supported.");
     }
 
-    /** {@inheritDoc} */
     @Override
     public SnapshotCopier startToCopyFrom(String uri, SnapshotCopierOptions opts) {
         SnapshotUri snapshotUri = SnapshotUri.fromStringUri(uri);
@@ -194,7 +201,6 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
         return copier;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void setSnapshotThrottle(SnapshotThrottle snapshotThrottle) {
         this.snapshotThrottle = snapshotThrottle;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index 93a0fa9052..e0301bf1bf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -31,7 +32,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 
 /**
  * Snapshot storage factory for {@link MvPartitionStorage}. Utilizes the fact that every partition already stores its latest applied index
- * and thus can inself be used as its own snapshot.
+ * and thus can itself be used as its own snapshot.
  *
  * <p/>Uses {@link MvPartitionStorage#persistedIndex()} and configuration, passed into constructor, to create a {@link SnapshotMeta} object
  * in {@link SnapshotReader#load()}.
@@ -55,9 +56,12 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
     /** List of learners. */
     private final List<String> learners;
 
-    /** RAFT log index read from {@link PartitionAccess#persistedIndex()} during factory instantiation. */
+    /** RAFT log index read from {@link MvPartitionStorage#persistedIndex()} during factory instantiation. */
     private final long persistedRaftIndex;
 
+    /** Incoming snapshots executor. */
+    private final Executor incomingSnapshotsExecutor;
+
     /**
      * Constructor.
      *
@@ -66,6 +70,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
      * @param partition MV partition storage.
      * @param peers List of raft group peers to be used in snapshot meta.
      * @param learners List of raft group learners to be used in snapshot meta.
+     * @param incomingSnapshotsExecutor Incoming snapshots executor.
      * @see SnapshotMeta
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@@ -74,18 +79,19 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
             OutgoingSnapshotsManager outgoingSnapshotsManager,
             PartitionAccess partition,
             List<String> peers,
-            List<String> learners
+            List<String> learners,
+            Executor incomingSnapshotsExecutor
     ) {
         this.topologyService = topologyService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
         this.partition = partition;
         this.peers = peers;
         this.learners = learners;
+        this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
 
-        persistedRaftIndex = partition.persistedIndex();
+        persistedRaftIndex = partition.mvPartitionStorage().persistedIndex();
     }
 
-    /** {@inheritDoc} */
     @Override
     public SnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) {
         SnapshotMeta snapshotMeta = new RaftMessagesFactory().snapshotMeta()
@@ -103,7 +109,8 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
                 uri,
                 raftOptions,
                 partition,
-                snapshotMeta
+                snapshotMeta,
+                incomingSnapshotsExecutor
         );
     }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotUri.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotUri.java
index 341d96d4f6..ac49771516 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotUri.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotUri.java
@@ -30,7 +30,7 @@ public class SnapshotUri {
      * Creates a string representation of the snapshot URI.
      *
      * @param snapshotId Snapshot id.
-     * @param nodeName Sender node name.
+     * @param nodeName Sender node (consistent id) name.
      */
     public static String toStringUri(UUID snapshotId, String nodeName) {
         return nodeName + "-" + snapshotId;
@@ -52,7 +52,7 @@ public class SnapshotUri {
     /** Snapshot id. */
     public final UUID snapshotId;
 
-    /** Sender node name. */
+    /** Sender node (consistent id) name. */
     public final String nodeName;
 
     /**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 48d5504d31..da699b96a9 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -17,94 +17,159 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
 
-import java.io.IOException;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.table.distributed.TableManager.FULL_RABALANCING_STARTED;
+
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.MessagingService;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
-import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Snapshot copier implementation for partitions. Used to stream partition data from the leader to the local node.
  */
 public class IncomingSnapshotCopier extends SnapshotCopier {
-    /** Messages factory. */
+    private static final IgniteLogger LOG = Loggers.forClass(IncomingSnapshotCopier.class);
+
     private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
 
-    /** {@link SnapshotStorage} instance for the partition. */
-    private final PartitionSnapshotStorage snapshotStorage;
+    private static final long NETWORK_TIMEOUT = Long.MAX_VALUE;
 
-    /** Snapshot URI. */
-    private final SnapshotUri snapshotUri;
+    private final PartitionSnapshotStorage partitionSnapshotStorage;
 
-    /** Rebalance thread-pool, used to write data into a storage. */
-    //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-    // Use external pool.
-    private final ExecutorService threadPool = Executors.newSingleThreadExecutor();
+    private final SnapshotUri snapshotUri;
 
     /**
      * Snapshot meta read from the leader.
      *
      * @see SnapshotMetaRequest
      */
-    private SnapshotMeta snapshotMeta;
+    @Nullable
+    private volatile SnapshotMeta snapshotMeta;
+
+    private volatile boolean canceled;
+
+    @Nullable
+    private volatile CompletableFuture<?> future;
 
     /**
      * Constructor.
      *
-     * @param snapshotStorage Snapshot storage.
+     * @param partitionSnapshotStorage Snapshot storage.
      * @param snapshotUri Snapshot URI.
      */
-    public IncomingSnapshotCopier(PartitionSnapshotStorage snapshotStorage, SnapshotUri snapshotUri) {
-        this.snapshotStorage = snapshotStorage;
+    public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage, SnapshotUri snapshotUri) {
+        this.partitionSnapshotStorage = partitionSnapshotStorage;
         this.snapshotUri = snapshotUri;
     }
 
     @Override
-    public void cancel() {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        // Implement.
+    public void start() {
+        Executor executor = partitionSnapshotStorage.getIncomingSnapshotsExecutor();
+
+        LOG.info("Copier is started for the partition [partId={}, tableId={}]", partId(), tableId());
+
+        future = prepareMvPartitionStorageForRebalance(executor)
+                .thenCompose(unused -> prepareTxStatePartitionStorageForRebalance(executor))
+                .thenCompose(unused -> {
+                    ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
+
+                    if (snapshotSender == null) {
+                        LOG.error(
+                                "Snapshot sender not found [partId={}, tableId={}, nodeName={}]",
+                                partId(),
+                                tableId(),
+                                snapshotUri.nodeName
+                        );
+
+                        if (!isOk()) {
+                            setError(RaftError.UNKNOWN, "Sender node was not found or it is offline");
+                        }
+
+                        return completedFuture(null);
+                    }
+
+                    return loadSnapshotMeta(snapshotSender)
+                            .thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor))
+                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor))
+                            .thenAcceptAsync(unused1 -> updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+                });
     }
 
     @Override
     public void join() throws InterruptedException {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        // Implement proper join.
+        CompletableFuture<?> fut = future;
+
+        if (fut != null) {
+            try {
+                fut.get();
+
+                if (canceled && !isOk()) {
+                    setError(RaftError.ECANCELED, "Copier is cancelled");
+                }
+            } catch (CancellationException e) {
+                // Ignored.
+            } catch (ExecutionException e) {
+                Throwable cause = e.getCause();
+
+                LOG.error("Error when completing the copier", cause);
+
+                if (!isOk()) {
+                    setError(RaftError.UNKNOWN, "Unknown error on completion the copier");
+                }
+
+                // By analogy with LocalSnapshotCopier#join.
+                throw new IllegalStateException(cause);
+            }
+        }
     }
 
     @Override
-    public void start() {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-        // What if node can't be resolved?
-        ClusterNode sourceNode = snapshotStorage.topologyService().getByConsistentId(snapshotUri.nodeName);
-
-        MessagingService messagingService = snapshotStorage.outgoingSnapshotsManager().messagingService();
-
-        threadPool.submit(() -> {
-            //TODO https://issues.apache.org/jira/browse/IGNITE-17262
-            // Following code is just an example of what I expect and shouldn't be considered a template.
-            CompletableFuture<NetworkMessage> metaRequestFuture = messagingService.invoke(
-                    sourceNode,
-                    MSG_FACTORY.snapshotMetaRequest().id(snapshotUri.snapshotId).build(),
-                    1000L
-            );
-
-            metaRequestFuture.whenComplete((networkMessage, throwable) -> {
-                SnapshotMetaResponse metaResponse = (SnapshotMetaResponse) networkMessage;
-
-                snapshotMeta = metaResponse.meta();
-            });
-        });
+    public void cancel() {
+        canceled = true;
+
+        LOG.info("Copier is canceled for partition [partId={}, tableId={}]", partId(), tableId());
+
+        CompletableFuture<?> fut = future;
+
+        if (fut != null) {
+            try {
+                // Because after the cancellation, no one waits for #join.
+                join();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        // No-op.
     }
 
     @Override
@@ -113,8 +178,236 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
         return new IncomingSnapshotReader(snapshotMeta);
     }
 
-    @Override
-    public void close() throws IOException {
-        threadPool.shutdownNow();
+    /**
+     * Prepares the {@link MvPartitionStorage} for a full rebalance.
+     *
+     * <p>Recreates {@link MvPartitionStorage} and sets the last applied index to {@link TableManager#FULL_RABALANCING_STARTED} so that
+     * when the node is restarted, we can understand that the full rebalance has not completed, and we need to clean up the storage from
+     * garbage.
+     */
+    private CompletableFuture<?> prepareMvPartitionStorageForRebalance(Executor executor) {
+        if (canceled) {
+            return completedFuture(null);
+        }
+
+        return CompletableFuture.supplyAsync(() -> partitionSnapshotStorage.partition().reCreateMvPartitionStorage(), executor)
+                .thenCompose(mvPartitionStorage -> {
+                    if (canceled) {
+                        return completedFuture(null);
+                    }
+
+                    mvPartitionStorage.runConsistently(() -> {
+                        mvPartitionStorage.lastAppliedIndex(FULL_RABALANCING_STARTED);
+
+                        return null;
+                    });
+
+                    LOG.info("Copier prepared multi-versioned storage for the partition [partId={}, tableId={}]", partId());
+
+                    return completedFuture(null);
+                });
+    }
+
+    /**
+     * Prepares the {@link TxStateStorage} for a full rebalance.
+     *
+     * <p>Recreates {@link TxStateStorage} and sets the last applied index to {@link TableManager#FULL_RABALANCING_STARTED} so that when
+     * the node is restarted, we can understand that the full rebalance has not completed, and we need to clean up the storage from
+     * garbage.
+     */
+    private CompletableFuture<?> prepareTxStatePartitionStorageForRebalance(Executor executor) {
+        if (canceled) {
+            return completedFuture(null);
+        }
+
+        return CompletableFuture.supplyAsync(() -> partitionSnapshotStorage.partition().reCreateTxStatePartitionStorage(), executor)
+                .thenCompose(txStatePartitionStorage -> {
+                    if (canceled) {
+                        return completedFuture(null);
+                    }
+
+                    txStatePartitionStorage.lastAppliedIndex(FULL_RABALANCING_STARTED);
+
+                    LOG.info("Copier prepared transaction state storage for the partition [partId={}, tableId={}]", partId());
+
+                    return completedFuture(null);
+                });
+    }
+
+    private @Nullable ClusterNode getSnapshotSender(String nodeName) {
+        return partitionSnapshotStorage.topologyService().getByConsistentId(nodeName);
+    }
+
+    /**
+     * Requests and saves the snapshot meta in {@link #snapshotMeta}.
+     */
+    private CompletableFuture<?> loadSnapshotMeta(ClusterNode snapshotSender) {
+        if (canceled) {
+            return completedFuture(null);
+        }
+
+        return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+                snapshotSender,
+                MSG_FACTORY.snapshotMetaRequest().id(snapshotUri.snapshotId).build(),
+                NETWORK_TIMEOUT
+        ).thenAccept(response -> {
+            snapshotMeta = ((SnapshotMetaResponse) response).meta();
+
+            LOG.info("Copier has loaded the snapshot meta for the partition [partId={}, tableId={}]", partId(), tableId());
+        });
+    }
+
+    /**
+     * Requests and stores data into {@link MvPartitionStorage}.
+     */
+    private CompletableFuture<?> loadSnapshotMvData(ClusterNode snapshotSender, Executor executor) {
+        if (canceled) {
+            return completedFuture(null);
+        }
+
+        return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+                snapshotSender,
+                MSG_FACTORY.snapshotMvDataRequest().id(snapshotUri.snapshotId).build(),
+                NETWORK_TIMEOUT
+        ).thenComposeAsync(response -> {
+            SnapshotMvDataResponse snapshotMvDataResponse = ((SnapshotMvDataResponse) response);
+
+            MvPartitionStorage mvPartition = partitionSnapshotStorage.partition().mvPartitionStorage();
+
+            for (ResponseEntry entry : snapshotMvDataResponse.rows()) {
+                if (canceled) {
+                    return completedFuture(null);
+                }
+
+                // Let's write all versions for the row ID.
+                mvPartition.runConsistently(() -> {
+                    RowId rowId = new RowId(partId(), entry.rowId());
+
+                    for (int i = 0; i < entry.rowVersions().size(); i++) {
+                        HybridTimestamp timestamp = i < entry.timestamps().size() ? entry.timestamps().get(i) : null;
+
+                        BinaryRow binaryRow = new ByteBufferRow(entry.rowVersions().get(i).rewind());
+
+                        if (timestamp == null) {
+                            // Writes an intent to write (uncommitted version).
+                            assert entry.txId() != null;
+                            assert entry.commitTableId() != null;
+                            assert entry.commitPartitionId() != ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+
+                            mvPartition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId());
+                        } else {
+                            // Writes committed version.
+                            mvPartition.addWriteCommitted(rowId, binaryRow, timestamp);
+                        }
+                    }
+
+                    return null;
+                });
+            }
+
+            if (snapshotMvDataResponse.finish()) {
+                LOG.info(
+                        "Copier has finished loading multi-versioned data [partId={}, rows={}]",
+                        partId(),
+                        snapshotMvDataResponse.rows().size()
+                );
+
+                return completedFuture(null);
+            } else {
+                LOG.info(
+                        "Copier has loaded a portion of multi-versioned data [partId={}, rows={}]",
+                        partId(),
+                        snapshotMvDataResponse.rows().size()
+                );
+
+                // Let's upload the rest.
+                return loadSnapshotMvData(snapshotSender, executor);
+            }
+        }, executor);
+    }
+
+    /**
+     * Requests and stores data into {@link TxStateStorage}.
+     */
+    private CompletableFuture<?> loadSnapshotTxData(ClusterNode snapshotSender, Executor executor) {
+        if (canceled) {
+            return completedFuture(null);
+        }
+
+        return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+                snapshotSender,
+                MSG_FACTORY.snapshotTxDataRequest().id(snapshotUri.snapshotId).build(),
+                NETWORK_TIMEOUT
+        ).thenComposeAsync(response -> {
+            SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse) response;
+
+            TxStateStorage txStatePartitionStorage = partitionSnapshotStorage.partition().txStatePartitionStorage();
+
+            assert snapshotTxDataResponse.txMeta().size() == snapshotTxDataResponse.txIds().size();
+
+            for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); i++) {
+                if (canceled) {
+                    return completedFuture(null);
+                }
+
+                txStatePartitionStorage.put(snapshotTxDataResponse.txIds().get(i), snapshotTxDataResponse.txMeta().get(i));
+            }
+
+            if (snapshotTxDataResponse.finish()) {
+                LOG.info(
+                        "Copier has finished loading transaction meta [partId={}, metas={}]",
+                        partId(),
+                        snapshotTxDataResponse.txMeta().size()
+                );
+
+                return completedFuture(null);
+            } else {
+                LOG.info(
+                        "Copier has loaded a portion of transaction meta [partId={}, metas={}]",
+                        partId(),
+                        snapshotTxDataResponse.txMeta().size()
+                );
+
+                // Let's upload the rest.
+                return loadSnapshotTxData(snapshotSender, executor);
+            }
+        }, executor);
+    }
+
+    /**
+     * Updates the last applied index for {@link MvPartitionStorage} and {@link TxStateStorage} from the {@link #snapshotMeta}.
+     */
+    private void updateLastAppliedIndexFromSnapshotMetaForStorages() {
+        if (canceled) {
+            return;
+        }
+
+        TxStateStorage txStatePartitionStorage = partitionSnapshotStorage.partition().txStatePartitionStorage();
+        MvPartitionStorage mvPartitionStorage = partitionSnapshotStorage.partition().mvPartitionStorage();
+
+        SnapshotMeta meta = snapshotMeta;
+
+        assert meta != null;
+
+        mvPartitionStorage.runConsistently(() -> {
+            txStatePartitionStorage.lastAppliedIndex(meta.lastIncludedIndex());
+            mvPartitionStorage.lastAppliedIndex(meta.lastIncludedIndex());
+
+            return null;
+        });
+
+        LOG.info(
+                "Copier has finished updating last applied index for the partition [partId={}, lastAppliedIndex={}]",
+                partId(),
+                meta.lastIncludedIndex()
+        );
+    }
+
+    private int partId() {
+        return partitionSnapshotStorage.partition().partitionKey().partitionId();
+    }
+
+    private UUID tableId() {
+        return partitionSnapshotStorage.partition().partitionKey().tableId();
     }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index edb233402a..1a6ff44f96 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
+import static java.util.stream.Collectors.toList;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -103,7 +105,7 @@ public class OutgoingSnapshot {
         this.partition = partition;
         this.outgoingSnapshotRegistry = outgoingSnapshotRegistry;
 
-        lastRowId = RowId.lowestRowId(partition.key().partitionId());
+        lastRowId = RowId.lowestRowId(partition.partitionKey().partitionId());
     }
 
     /**
@@ -119,7 +121,7 @@ public class OutgoingSnapshot {
      * @return Partition key.
      */
     public PartitionKey partitionKey() {
-        return partition.key();
+        return partition.partitionKey();
     }
 
     /**
@@ -221,11 +223,11 @@ public class OutgoingSnapshot {
         }
 
         if (!startedToReadPartition) {
-            lastRowId = partition.closestRowId(lastRowId);
+            lastRowId = partition.mvPartitionStorage().closestRowId(lastRowId);
 
             startedToReadPartition = true;
         } else {
-            lastRowId = partition.closestRowId(lastRowId.increment());
+            lastRowId = partition.mvPartitionStorage().closestRowId(lastRowId.increment());
         }
 
         if (!exhaustedPartition()) {
@@ -250,7 +252,7 @@ public class OutgoingSnapshot {
     }
 
     private SnapshotMvDataResponse.ResponseEntry rowEntry(RowId rowId) {
-        List<ReadResult> rowVersionsN2O = partition.rowVersions(rowId);
+        List<ReadResult> rowVersionsN2O = partition.mvPartitionStorage().scanVersions(rowId).stream().collect(toList());
 
         List<ByteBuffer> buffers = new ArrayList<>(rowVersionsN2O.size());
         List<HybridTimestamp> commitTimestamps = new ArrayList<>(rowVersionsN2O.size());
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index b3fa50a288..4557970b9b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -50,7 +50,7 @@ public class OutgoingSnapshotReader extends SnapshotReader {
         //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         // This meta is wrong, we need a right one.
         snapshotMeta = new RaftMessagesFactory().snapshotMeta()
-                .lastIncludedIndex(snapshotStorage.partition().persistedIndex())
+                .lastIncludedIndex(snapshotStorage.partition().mvPartitionStorage().persistedIndex())
                 .lastIncludedTerm(snapshotStorage.startupSnapshotMeta().lastIncludedTerm())
                 .peersList(snapshotStorage.startupSnapshotMeta().peersList())
                 .learnersList(snapshotStorage.startupSnapshotMeta().learnersList())
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
deleted file mode 100644
index 247ac674ec..0000000000
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.raft.snapshot;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.ReadResult;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.util.Cursor;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-class PartitionAccessImplTest {
-    @Mock
-    private MvPartitionStorage partitionStorage;
-
-    private PartitionAccessImpl access;
-
-    private final PartitionKey key = new PartitionKey(UUID.randomUUID(), 1);
-
-    @BeforeEach
-    void createTestInstance() {
-        access = new PartitionAccessImpl(key, partitionStorage);
-    }
-
-    @Test
-    void returnsProvidedKey() {
-        assertThat(access.key(), is(key));
-    }
-
-    @Test
-    void persistedIndexDelegatesToStorage() {
-        when(partitionStorage.persistedIndex()).thenReturn(42L);
-
-        assertThat(access.persistedIndex(), is(42L));
-    }
-
-    @Test
-    void minRowIdDelegatesToStorage() {
-        RowId argRowId = new RowId(1);
-        RowId resultRowId = new RowId(1);
-
-        when(partitionStorage.closestRowId(any())).thenReturn(resultRowId);
-
-        assertThat(access.closestRowId(argRowId), is(resultRowId));
-    }
-
-    @Test
-    void returnsRowVersionsFromStorage() {
-        ReadResult result1 = mock(ReadResult.class);
-        ReadResult result2 = mock(ReadResult.class);
-
-        when(partitionStorage.scanVersions(any()))
-                .thenReturn(Cursor.fromIterator(List.of(result1, result2).iterator()));
-
-        List<ReadResult> versions = access.rowVersions(new RowId(1));
-        assertThat(versions, is(equalTo(List.of(result1, result2))));
-    }
-}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
new file mode 100644
index 0000000000..7c82c79035
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateTableStorage;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions;
+import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * For {@link IncomingSnapshotCopier} testing.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class IncomingSnapshotCopierTest {
+    private static final String NODE_NAME = "node";
+
+    private static final int TEST_PARTITION = 0;
+
+    private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new SchemaDescriptor(
+            1,
+            new Column[]{new Column("key", NativeTypes.stringOf(256), false)},
+            new Column[]{new Column("value", NativeTypes.stringOf(256), false)}
+    );
+
+    private static final HybridClock HYBRID_CLOCK = new HybridClock();
+
+    private static final TableMessagesFactory TABLE_MSG_FACTORY = new TableMessagesFactory();
+
+    private static final RaftMessagesFactory RAFT_MSG_FACTORY = new RaftMessagesFactory();
+
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    @InjectConfiguration(value = "mock.tables.foo {}")
+    private TablesConfiguration tablesConfig;
+
+    @AfterEach
+    void tearDown() {
+        shutdownAndAwaitTermination(executorService, 1, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void test() throws Exception {
+        MvPartitionStorage outgoingMvPartitionStorage = new TestMvPartitionStorage(TEST_PARTITION);
+        TxStateStorage outgoingTxStatePartitionStorage = new TestConcurrentHashMapTxStateStorage();
+
+        long expLastAppliedIndex = 100500L;
+
+        List<RowId> rowIds = fillMvPartitionStorage(outgoingMvPartitionStorage);
+        List<UUID> txIds = fillTxStatePartitionStorage(outgoingTxStatePartitionStorage);
+
+        outgoingMvPartitionStorage.lastAppliedIndex(expLastAppliedIndex);
+        outgoingTxStatePartitionStorage.lastAppliedIndex(expLastAppliedIndex);
+
+        UUID snapshotId = UUID.randomUUID();
+
+        MvTableStorage incomingMvTableStorage = spy(new TestMvTableStorage(tablesConfig.tables().get("foo"), tablesConfig));
+        TxStateTableStorage incomingTxStateTableStorage = spy(new TestConcurrentHashMapTxStateTableStorage());
+
+        incomingMvTableStorage.getOrCreateMvPartition(TEST_PARTITION);
+        incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+
+        PartitionSnapshotStorage partitionSnapshotStorage = createPartitionSnapshotStorage(
+                snapshotId,
+                expLastAppliedIndex,
+                incomingMvTableStorage,
+                incomingTxStateTableStorage,
+                outgoingMvPartitionStorage,
+                outgoingTxStatePartitionStorage,
+                rowIds,
+                txIds
+        );
+
+        SnapshotCopier snapshotCopier = partitionSnapshotStorage.startToCopyFrom(
+                SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+                mock(SnapshotCopierOptions.class)
+        );
+
+        runAsync(snapshotCopier::join).get(1, TimeUnit.SECONDS);
+
+        assertEquals(Status.OK().getCode(), snapshotCopier.getCode());
+
+        MvPartitionStorage incomingMvPartitionStorage = incomingMvTableStorage.getMvPartition(TEST_PARTITION);
+        TxStateStorage incomingTxStatePartitionStorage = incomingTxStateTableStorage.getTxStateStorage(TEST_PARTITION);
+
+        assertEquals(outgoingMvPartitionStorage.lastAppliedIndex(), expLastAppliedIndex);
+        assertEquals(outgoingTxStatePartitionStorage.lastAppliedIndex(), expLastAppliedIndex);
+
+        assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
+        assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
+
+        verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
+        verify(incomingMvTableStorage, times(2)).getOrCreateMvPartition(eq(TEST_PARTITION));
+
+        verify(incomingTxStateTableStorage, times(1)).destroyTxStateStorage(eq(TEST_PARTITION));
+        verify(incomingTxStateTableStorage, times(2)).getOrCreateTxStateStorage(eq(TEST_PARTITION));
+    }
+
+    private PartitionSnapshotStorage createPartitionSnapshotStorage(
+            UUID snapshotId,
+            long lastAppliedIndexForSnapshotMeta,
+            MvTableStorage incomingTableStorage,
+            TxStateTableStorage incomingTxStateTableStorage,
+            MvPartitionStorage outgoingMvPartitionStorage,
+            TxStateStorage outgoingTxStatePartitionStorage,
+            List<RowId> rowIds,
+            List<UUID> txIds
+    ) {
+        TopologyService topologyService = mock(TopologyService.class);
+
+        ClusterNode clusterNode = mock(ClusterNode.class);
+
+        when(topologyService.getByConsistentId(NODE_NAME)).thenReturn(clusterNode);
+
+        OutgoingSnapshotsManager outgoingSnapshotsManager = mock(OutgoingSnapshotsManager.class);
+
+        MessagingService messagingService = mock(MessagingService.class);
+
+        when(messagingService.invoke(eq(clusterNode), any(SnapshotMetaRequest.class), anyLong())).then(answer -> {
+            SnapshotMetaRequest snapshotMetaRequest = answer.getArgument(1);
+
+            assertEquals(snapshotId, snapshotMetaRequest.id());
+
+            return completedFuture(
+                    TABLE_MSG_FACTORY.snapshotMetaResponse()
+                            .meta(RAFT_MSG_FACTORY.snapshotMeta().lastIncludedIndex(lastAppliedIndexForSnapshotMeta).build())
+                            .build()
+            );
+        });
+
+        when(messagingService.invoke(eq(clusterNode), any(SnapshotMvDataRequest.class), anyLong())).then(answer -> {
+            SnapshotMvDataRequest snapshotMvDataRequest = answer.getArgument(1);
+
+            assertEquals(snapshotId, snapshotMvDataRequest.id());
+
+            List<ResponseEntry> responseEntries = createSnapshotMvDataEntries(outgoingMvPartitionStorage, rowIds);
+
+            assertThat(responseEntries, not(empty()));
+
+            return completedFuture(TABLE_MSG_FACTORY.snapshotMvDataResponse().rows(responseEntries).finish(true).build());
+        });
+
+        when(messagingService.invoke(eq(clusterNode), any(SnapshotTxDataRequest.class), anyLong())).then(answer -> {
+            SnapshotTxDataRequest snapshotTxDataRequest = answer.getArgument(1);
+
+            assertEquals(snapshotId, snapshotTxDataRequest.id());
+
+            List<TxMeta> txMetas = txIds.stream().map(outgoingTxStatePartitionStorage::get).collect(toList());
+
+            return completedFuture(TABLE_MSG_FACTORY.snapshotTxDataResponse().txIds(txIds).txMeta(txMetas).finish(true).build());
+        });
+
+        when(outgoingSnapshotsManager.messagingService()).thenReturn(messagingService);
+
+        return new PartitionSnapshotStorage(
+                topologyService,
+                outgoingSnapshotsManager,
+                SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+                mock(RaftOptions.class),
+                new PartitionAccessImpl(
+                        new PartitionKey(UUID.randomUUID(), TEST_PARTITION),
+                        incomingTableStorage,
+                        incomingTxStateTableStorage
+                ),
+                mock(SnapshotMeta.class),
+                executorService
+        );
+    }
+
+    private static List<RowId> fillMvPartitionStorage(MvPartitionStorage storage) {
+        List<RowId> rowIds = List.of(
+                new RowId(TEST_PARTITION),
+                new RowId(TEST_PARTITION),
+                new RowId(TEST_PARTITION),
+                new RowId(TEST_PARTITION)
+        );
+
+        storage.runConsistently(() -> {
+            // Writes committed version.
+            storage.addWriteCommitted(rowIds.get(0), createBinaryRow("k0", "v0"), HYBRID_CLOCK.now());
+            storage.addWriteCommitted(rowIds.get(1), createBinaryRow("k1", "v1"), HYBRID_CLOCK.now());
+
+            storage.addWriteCommitted(rowIds.get(2), createBinaryRow("k20", "v20"), HYBRID_CLOCK.now());
+            storage.addWriteCommitted(rowIds.get(2), createBinaryRow("k21", "v21"), HYBRID_CLOCK.now());
+
+            // Writes an intent to write (uncommitted version).
+            storage.addWrite(rowIds.get(2), createBinaryRow("k22", "v22"), UUID.randomUUID(), UUID.randomUUID(), TEST_PARTITION);
+
+            storage.addWrite(
+                    rowIds.get(3),
+                    createBinaryRow("k3", "v3"),
+                    UUID.randomUUID(),
+                    UUID.randomUUID(),
+                    TEST_PARTITION
+            );
+
+            return null;
+        });
+
+        return rowIds;
+    }
+
+    private static List<UUID> fillTxStatePartitionStorage(TxStateStorage storage) {
+        List<UUID> txIds = List.of(
+                UUID.randomUUID(),
+                UUID.randomUUID(),
+                UUID.randomUUID(),
+                UUID.randomUUID()
+        );
+
+        UUID tableId = UUID.randomUUID();
+
+        storage.put(txIds.get(0), new TxMeta(COMMITED, List.of(new TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+        storage.put(txIds.get(1), new TxMeta(COMMITED, List.of(new TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+        storage.put(txIds.get(2), new TxMeta(ABORTED, List.of(new TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+        storage.put(txIds.get(3), new TxMeta(ABORTED, List.of(new TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+
+        return txIds;
+    }
+
+    private static List<ResponseEntry> createSnapshotMvDataEntries(MvPartitionStorage storage, List<RowId> rowIds) {
+        List<ResponseEntry> responseEntries = new ArrayList<>();
+
+        for (RowId rowId : rowIds) {
+            List<ReadResult> readResults = storage.scanVersions(rowId).stream().collect(toList());
+
+            Collections.reverse(readResults);
+
+            List<ByteBuffer> rowVersions = new ArrayList<>();
+            List<HybridTimestamp> timestamps = new ArrayList<>();
+
+            UUID txId = null;
+            UUID commitTableId = null;
+            int commitPartitionId = ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+
+            for (ReadResult readResult : readResults) {
+                rowVersions.add(readResult.binaryRow().byteBuffer());
+
+                if (readResult.isWriteIntent()) {
+                    txId = readResult.transactionId();
+                    commitTableId = readResult.commitTableId();
+                    commitPartitionId = readResult.commitPartitionId();
+                } else {
+                    timestamps.add(readResult.commitTimestamp());
+                }
+            }
+
+            responseEntries.add(
+                    TABLE_MSG_FACTORY.responseEntry()
+                            .rowId(new UUID(rowId.mostSignificantBits(), rowId.leastSignificantBits()))
+                            .rowVersions(rowVersions)
+                            .timestamps(timestamps)
+                            .txId(txId)
+                            .commitTableId(commitTableId)
+                            .commitPartitionId(commitPartitionId)
+                            .build()
+            );
+        }
+
+        return responseEntries;
+    }
+
+    private static BinaryRow createBinaryRow(String key, String value) {
+        return new RowAssembler(SCHEMA_DESCRIPTOR, 1, 1).appendString(key).appendString(value).build();
+    }
+
+    private static void assertEqualsMvRows(MvPartitionStorage expected, MvPartitionStorage actual, List<RowId> rowIds) {
+        for (RowId rowId : rowIds) {
+            List<ReadResult> expReadResults = expected.scanVersions(rowId).stream().collect(toList());
+            List<ReadResult> actReadResults = actual.scanVersions(rowId).stream().collect(toList());
+
+            assertEquals(expReadResults.size(), actReadResults.size(), rowId.toString());
+
+            for (int i = 0; i < expReadResults.size(); i++) {
+                ReadResult expReadResult = expReadResults.get(i);
+                ReadResult actReadResult = actReadResults.get(i);
+
+                String msg = "RowId=" + rowId + ", i=" + i;
+
+                Row expRow = new Row(SCHEMA_DESCRIPTOR, expReadResult.binaryRow());
+                Row actRow = new Row(SCHEMA_DESCRIPTOR, actReadResult.binaryRow());
+
+                assertEquals(expRow.stringValue(0), actRow.stringValue(0), msg);
+                assertEquals(expRow.stringValue(1), actRow.stringValue(1), msg);
+
+                assertEquals(expReadResult.commitTimestamp(), actReadResult.commitTimestamp(), msg);
+                assertEquals(expReadResult.transactionId(), actReadResult.transactionId(), msg);
+                assertEquals(expReadResult.commitTableId(), actReadResult.commitTableId(), msg);
+                assertEquals(expReadResult.commitPartitionId(), actReadResult.commitPartitionId(), msg);
+                assertEquals(expReadResult.isWriteIntent(), actReadResult.isWriteIntent(), msg);
+            }
+        }
+    }
+
+    private static void assertEqualsTxStates(TxStateStorage expected, TxStateStorage actual, List<UUID> txIds) {
+        for (UUID txId : txIds) {
+            assertEquals(expected.get(txId), actual.get(txId));
+        }
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
index 22b6001e63..e5af7246d5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAcces
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
+import org.apache.ignite.internal.util.Cursor;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -55,6 +57,9 @@ class OutgoingSnapshotTest {
     @Mock
     private PartitionAccess partitionAccess;
 
+    @Mock
+    private MvPartitionStorage mvPartitionStorage;
+
     @Mock
     private OutgoingSnapshotRegistry snapshotRegistry;
 
@@ -78,7 +83,9 @@ class OutgoingSnapshotTest {
 
     @BeforeEach
     void createTestInstance() {
-        lenient().when(partitionAccess.key()).thenReturn(partitionKey);
+        lenient().when(partitionAccess.partitionKey()).thenReturn(partitionKey);
+
+        lenient().when(partitionAccess.mvPartitionStorage()).thenReturn(mvPartitionStorage);
 
         snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, snapshotRegistry);
     }
@@ -130,9 +137,9 @@ class OutgoingSnapshotTest {
     }
 
     private void configureStorageToHaveExactlyOneRowWith(List<ReadResult> versions) {
-        when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
-        when(partitionAccess.rowVersions(rowId1)).thenReturn(versions);
-        lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(null);
+        when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+        when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(versions));
+        lenient().when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(null);
     }
 
     private SnapshotMvDataResponse getMvDataResponse(long batchSizeHint) throws InterruptedException, ExecutionException, TimeoutException {
@@ -167,11 +174,11 @@ class OutgoingSnapshotTest {
         ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
         ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
 
-        when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
-        when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1));
-        when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
-        when(partitionAccess.rowVersions(rowId2)).thenReturn(List.of(version2));
-        when(partitionAccess.closestRowId(rowId3)).thenReturn(null);
+        when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+        when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1)));
+        when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(rowId2);
+        when(mvPartitionStorage.scanVersions(rowId2)).thenReturn(Cursor.fromIterable(List.of(version2)));
+        when(mvPartitionStorage.closestRowId(rowId3)).thenReturn(null);
 
         SnapshotMvDataResponse response = getMvDataResponse(Long.MAX_VALUE);
 
@@ -187,8 +194,8 @@ class OutgoingSnapshotTest {
     void rowsWithIdsToSkipAreIgnored() throws Exception {
         snapshot.addRowIdToSkip(rowId1);
 
-        when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
-        when(partitionAccess.closestRowId(rowId2)).thenReturn(null);
+        when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+        when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(null);
 
         SnapshotMvDataResponse response = getMvDataResponse(Long.MAX_VALUE);
 
@@ -206,7 +213,7 @@ class OutgoingSnapshotTest {
                 clock.now()
         );
 
-        when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version2, version1));
+        when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version2, version1)));
 
         snapshot.enqueueForSending(rowIdOutOfOrder);
 
@@ -230,7 +237,7 @@ class OutgoingSnapshotTest {
     }
 
     private void configureStorageToBeEmpty() {
-        lenient().when(partitionAccess.closestRowId(lowestRowId)).thenReturn(null);
+        lenient().when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(null);
     }
 
     @Test
@@ -238,7 +245,7 @@ class OutgoingSnapshotTest {
         ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
         ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
 
-        when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version1));
+        when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version1)));
 
         snapshot.enqueueForSending(rowIdOutOfOrder);
 
@@ -257,11 +264,11 @@ class OutgoingSnapshotTest {
         ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
         ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
 
-        when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version2));
+        when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version2)));
 
-        when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
-        when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1));
-        when(partitionAccess.closestRowId(rowId2)).then(invocation -> {
+        when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+        when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1)));
+        when(mvPartitionStorage.closestRowId(rowId2)).then(invocation -> {
             snapshot.enqueueForSending(rowIdOutOfOrder);
             return null;
         });
@@ -333,8 +340,8 @@ class OutgoingSnapshotTest {
     void mvDataHandlingRespectsBatchSizeHintForMessagesFromPartition() throws Exception {
         ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
 
-        when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
-        when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version));
+        when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+        when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version)));
 
         SnapshotMvDataResponse response = getMvDataResponse(1);
 
@@ -345,7 +352,7 @@ class OutgoingSnapshotTest {
     void mvDataHandlingRespectsBatchSizeHintForOutOfOrderMessages() throws Exception {
         ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
 
-        when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version));
+        when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version)));
 
         snapshot.enqueueForSending(rowIdOutOfOrder);
 
@@ -361,9 +368,9 @@ class OutgoingSnapshotTest {
         ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
         ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
 
-        when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
-        when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1, version2));
-        lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
+        when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+        when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
+        lenient().when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(rowId2);
 
         SnapshotMvDataResponse response = getMvDataResponse(1);
 
@@ -387,7 +394,7 @@ class OutgoingSnapshotTest {
     void sendsRowsFromOutOfOrderQueueBiggerThanHint() throws Exception {
         ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[1000]), clock.now());
 
-        when(partitionAccess.rowVersions(rowIdOutOfOrder)).thenReturn(List.of(version));
+        when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version)));
 
         snapshot.enqueueForSending(rowIdOutOfOrder);
 
@@ -410,9 +417,9 @@ class OutgoingSnapshotTest {
         ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
         ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
 
-        when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
-        when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1, version2));
-        lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
+        when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+        when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
+        lenient().when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(rowId2);
 
         getMvDataResponse(1);
 
@@ -424,9 +431,9 @@ class OutgoingSnapshotTest {
         ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
         ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
 
-        when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
-        when(partitionAccess.rowVersions(rowId1)).thenReturn(List.of(version1, version2));
-        lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
+        when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
+        when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
+        lenient().when(mvPartitionStorage.closestRowId(rowId2)).thenReturn(rowId2);
 
         getMvDataResponse(1);
 
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index 88fe42149c..3142730989 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -94,6 +94,11 @@ public interface TxStateStorage extends AutoCloseable {
      */
     long lastAppliedIndex();
 
+    /**
+     * Sets the last applied index value.
+     */
+    void lastAppliedIndex(long lastAppliedIndex);
+
     /**
      * {@link #lastAppliedIndex()} value consistent with the data, already persisted on the storage.
      */
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java
index c241328dd0..0e2bfb208a 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.tx.storage.state;
 
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.configuration.storage.StorageException;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.jetbrains.annotations.Nullable;
@@ -48,10 +47,9 @@ public interface TxStateTableStorage extends AutoCloseable {
      * Destroy transaction state storage.
      *
      * @param partitionId Partition id.
-     * @return Future.
      * @throws StorageException In case when the operation has failed.
      */
-    CompletableFuture<Void> destroyTxStateStorage(int partitionId) throws StorageException;
+    void destroyTxStateStorage(int partitionId) throws StorageException;
 
     /**
      * Table configuration.
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index f130366fd7..dfc5fc0813 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -122,7 +122,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         persistedIndex = lastAppliedIndex;
     }
 
-    /** {@inheritDoc} */
     @Override
     public TxMeta get(UUID txId) {
         if (!busyLock.enterBusy()) {
@@ -145,7 +144,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public void put(UUID txId, TxMeta txMeta) {
         if (!busyLock.enterBusy()) {
@@ -166,7 +164,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean compareAndSet(UUID txId, TxState txStateExpected, TxMeta txMeta, long commandIndex) {
         requireNonNull(txMeta);
@@ -221,7 +218,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public void remove(UUID txId) {
         if (!busyLock.enterBusy()) {
@@ -242,7 +238,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
         if (!busyLock.enterBusy()) {
@@ -297,19 +292,36 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> flush() {
         return tableStorage.awaitFlush(true);
     }
 
-    /** {@inheritDoc} */
     @Override
     public long lastAppliedIndex() {
         return lastAppliedIndex;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) {
+        if (!busyLock.enterBusy()) {
+            throwStorageStoppedException();
+        }
+
+        try {
+            db.put(lastAppliedIndexKey, longToBytes(lastAppliedIndex));
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_ERR,
+                    "Failed to write applied index value to transaction state storage, partition " + partitionId
+                            + " of table " + tableStorage.configuration().value().name(),
+                    e
+            );
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     @Override
     public long persistedIndex() {
         return persistedIndex;
@@ -350,7 +362,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         return appliedIndexBytes == null ? 0 : bytesToLong(appliedIndexBytes);
     }
 
-    /** {@inheritDoc} */
     @Override
     public void destroy() {
         try (WriteBatch writeBatch = new WriteBatch()) {
@@ -396,7 +407,6 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         return new UUID(msb, lsb);
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() throws Exception {
         if (!closeGuard.compareAndSet(false, true)) {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java
index ce4950c0d8..5cdbbba1e1 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.tx.storage.state.rocksdb;
 
 import static java.util.Collections.reverse;
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static org.rocksdb.ReadTier.PERSISTED_TIER;
 
@@ -33,8 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.function.IntSupplier;
 import org.apache.ignite.internal.configuration.storage.StorageException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.tostring.S;
@@ -56,9 +53,6 @@ import org.rocksdb.WriteOptions;
  * RocksDb implementation of {@link TxStateTableStorage}.
  */
 public class TxStateRocksDbTableStorage implements TxStateTableStorage {
-    /** Logger. */
-    private static final IgniteLogger LOG = Loggers.forClass(TxStateRocksDbTableStorage.class);
-
     static {
         RocksDB.loadLibrary();
     }
@@ -144,8 +138,8 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public TxStateStorage getOrCreateTxStateStorage(int partitionId) throws StorageException {
+    @Override
+    public TxStateStorage getOrCreateTxStateStorage(int partitionId) throws StorageException {
         checkPartitionId(partitionId);
 
         TxStateRocksDbStorage storage = storages.get(partitionId);
@@ -166,40 +160,36 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
         return storage;
     }
 
-    /** {@inheritDoc} */
-    @Override public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
+    @Override
+    public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
         return storages.get(partitionId);
     }
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<Void> destroyTxStateStorage(int partitionId) throws StorageException {
+    @Override
+    public void destroyTxStateStorage(int partitionId) throws StorageException {
         checkPartitionId(partitionId);
 
         TxStateStorage storage = storages.getAndSet(partitionId, null);
 
-        if (storage == null) {
-            return completedFuture(null);
-        }
+        if (storage != null) {
+            storage.destroy();
 
-        storage.destroy();
-
-        return awaitFlush(false).whenComplete((v, e) -> {
             try {
                 storage.close();
-            } catch (Exception ex) {
-                LOG.error("Couldn't close the transaction state storage of partition "
+            } catch (Exception e) {
+                throw new StorageException("Couldn't close the transaction state storage of partition "
                         + partitionId + ", table " + tableCfg.value().name());
             }
-        });
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public TableConfiguration configuration() {
+    @Override
+    public TableConfiguration configuration() {
         return tableCfg;
     }
 
-    /** {@inheritDoc} */
-    @Override public void start() throws StorageException {
+    @Override
+    public void start() throws StorageException {
         try {
             flusher = new RocksDbFlusher(
                 busyLock,
@@ -239,8 +229,8 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void stop() throws StorageException {
+    @Override
+    public void stop() throws StorageException {
         if (!stopGuard.compareAndSet(false, true)) {
             return;
         }
@@ -271,8 +261,8 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void destroy() throws StorageException {
+    @Override
+    public void destroy() throws StorageException {
         try (Options options = new Options()) {
             close();
 
@@ -305,8 +295,8 @@ public class TxStateRocksDbTableStorage implements TxStateTableStorage {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         stop();
     }
 
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateStorage.java
index 1909fd142d..db4a082b35 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateStorage.java
@@ -37,19 +37,18 @@ public class TestConcurrentHashMapTxStateStorage implements TxStateStorage {
     /** Storage. */
     private final ConcurrentHashMap<UUID, TxMeta> storage = new ConcurrentHashMap<>();
 
-    /** {@inheritDoc} */
+    private volatile long lastAppliedIndex;
+
     @Override
     public TxMeta get(UUID txId) {
         return storage.get(txId);
     }
 
-    /** {@inheritDoc} */
     @Override
     public void put(UUID txId, TxMeta txMeta) {
         storage.put(txId, txMeta);
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean compareAndSet(UUID txId, TxState txStateExpected, @NotNull TxMeta txMeta, long commandIndex) {
         while (true) {
@@ -76,19 +75,16 @@ public class TestConcurrentHashMapTxStateStorage implements TxStateStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public void remove(UUID txId) {
         storage.remove(txId);
     }
 
-    /** {@inheritDoc} */
     @Override
     public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
         return Cursor.fromIterator(storage.entrySet().stream().map(e -> new IgniteBiTuple<>(e.getKey(), e.getValue())).iterator());
     }
 
-    /** {@inheritDoc} */
     @Override
     public void destroy() {
         try {
@@ -100,25 +96,26 @@ public class TestConcurrentHashMapTxStateStorage implements TxStateStorage {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> flush() {
         return completedFuture(null);
     }
 
-    /** {@inheritDoc} */
     @Override
     public long lastAppliedIndex() {
-        return 0;
+        return lastAppliedIndex;
+    }
+
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) {
+        this.lastAppliedIndex = lastAppliedIndex;
     }
 
-    /** {@inheritDoc} */
     @Override
     public long persistedIndex() {
-        return 0;
+        return lastAppliedIndex;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() throws Exception {
         // No-op.
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateTableStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateTableStorage.java
index c46ff4a6ad..fdd6f91caa 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateTableStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestConcurrentHashMapTxStateTableStorage.java
@@ -17,10 +17,7 @@
 
 package org.apache.ignite.internal.tx.storage.state.test;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
-
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.internal.configuration.storage.StorageException;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -34,49 +31,46 @@ import org.jetbrains.annotations.Nullable;
 public class TestConcurrentHashMapTxStateTableStorage implements TxStateTableStorage {
     private final Map<Integer, TxStateStorage> storages = new ConcurrentHashMap<>();
 
-    /** {@inheritDoc} */
     @Override public TxStateStorage getOrCreateTxStateStorage(int partitionId) throws StorageException {
         return storages.computeIfAbsent(partitionId, k -> new TestConcurrentHashMapTxStateStorage());
     }
 
-    /** {@inheritDoc} */
-    @Override public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
+    @Override
+    public @Nullable TxStateStorage getTxStateStorage(int partitionId) {
         return storages.get(partitionId);
     }
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<Void> destroyTxStateStorage(int partitionId) throws StorageException {
-        TxStateStorage storage = storages.replace(partitionId, null);
+    @Override
+    public void destroyTxStateStorage(int partitionId) throws StorageException {
+        TxStateStorage storage = storages.remove(partitionId);
 
         if (storage != null) {
             storage.destroy();
         }
-
-        return completedFuture(null);
     }
 
-    /** {@inheritDoc} */
-    @Override public TableConfiguration configuration() {
+    @Override
+    public TableConfiguration configuration() {
         return null;
     }
 
-    /** {@inheritDoc} */
-    @Override public void start() throws StorageException {
+    @Override
+    public void start() throws StorageException {
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public void stop() throws StorageException {
+    @Override
+    public void stop() throws StorageException {
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public void destroy() throws StorageException {
+    @Override
+    public void destroy() throws StorageException {
         storages.clear();
     }
 
-    /** {@inheritDoc} */
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         stop();
     }
 }