You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/24 14:07:50 UTC

[ignite-3] branch main updated: IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier (#1562)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a8152876f1 IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier (#1562)
a8152876f1 is described below

commit a8152876f125a281b07cc8ae9527d1e8a412b34b
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Jan 24 17:07:43 2023 +0300

    IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier (#1562)
---
 .../internal/table/distributed/TableManager.java   |  32 +-
 .../distributed/raft/snapshot/PartitionAccess.java |  86 +++--
 .../raft/snapshot/PartitionAccessImpl.java         |  82 ++---
 .../snapshot/incoming/IncomingSnapshotCopier.java  | 379 +++++++++++----------
 .../incoming/IncomingSnapshotCopierTest.java       |   9 +-
 .../state/test/TestTxStateTableStorage.java        |   4 +-
 6 files changed, 300 insertions(+), 292 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2e40b60fdd..bb02f0016a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -169,14 +169,6 @@ import org.jetbrains.annotations.TestOnly;
  * Table manager.
  */
 public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTablesInternal, IgniteComponent {
-    /**
-     * The special value of the last applied index to indicate the beginning of a full data rebalancing.
-     *
-     * @see MvPartitionStorage#lastAppliedIndex()
-     * @see TxStateStorage#lastAppliedIndex()
-     */
-    public static final long FULL_RABALANCING_STARTED = -1;
-
     private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
 
     // TODO get rid of this in future? IGNITE-17307
@@ -2051,18 +2043,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @return Future that will complete when the operation completes.
      */
     private CompletableFuture<MvPartitionStorage> getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
-        return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
-                .thenCompose(storage -> {
-                    if (storage.persistedIndex() != FULL_RABALANCING_STARTED) {
-                        // If a full rebalance did not happen, then we return the storage as is.
-                        return completedFuture(storage);
-                    } else {
-                        // A full rebalance was started but not completed, so the partition must be recreated to remove the garbage.
-                        return mvTableStorage
-                                .destroyPartition(partitionId)
-                                .thenApplyAsync(unused -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor);
-                    }
-                });
+        // TODO: IGNITE-18603 Clear if TxStateStorage hasn't been rebalanced yet
+        return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor);
     }
 
     /**
@@ -2075,15 +2057,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @param partId Partition ID.
      */
     private static TxStateStorage getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partId) {
-        TxStateStorage txStatePartitionStorage = txStateTableStorage.getOrCreateTxStateStorage(partId);
-
-        // If a full rebalance did not happen, then we return the storage as is.
-        if (txStatePartitionStorage.persistedIndex() != FULL_RABALANCING_STARTED) {
-            return txStatePartitionStorage;
-        }
-
-        txStateTableStorage.destroyTxStateStorage(partId);
-
+        // TODO: IGNITE-18603 Clear if MvPartitionStorage hasn't been rebalanced yet
         return txStateTableStorage.getOrCreateTxStateStorage(partId);
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
index d0297e96c4..75c66e232e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -21,10 +21,12 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.util.Cursor;
@@ -40,21 +42,6 @@ public interface PartitionAccess {
      */
     PartitionKey partitionKey();
 
-    /**
-     * Destroys and recreates the multi-versioned partition storage.
-     *
-     * @return Future that will complete when the partition is recreated.
-     * @throws StorageException If an error has occurred during the partition destruction.
-     */
-    CompletableFuture<Void> reCreateMvPartitionStorage() throws StorageException;
-
-    /**
-     * Destroys and recreates the TX state partition storage.
-     *
-     * @throws StorageException If an error has occurred during destruction of the transaction state storage for the partition.
-     */
-    void reCreateTxStatePartitionStorage() throws StorageException;
-
     /**
      * Creates a cursor to scan all meta of transactions.
      *
@@ -124,11 +111,6 @@ public interface PartitionAccess {
      */
     void addWriteCommitted(RowId rowId, @Nullable TableRow row, HybridTimestamp commitTimestamp);
 
-    /**
-     * Updates the last applied index, term, and RAFT configuration.
-     */
-    void updateLastApplied(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig);
-
     /**
      * Returns the minimum applied index of the partition storages.
      */
@@ -148,4 +130,68 @@ public interface PartitionAccess {
      * Returns the maximum applied term of the partition storages.
      */
     long maxLastAppliedTerm();
+
+    /**
+     * Prepares partition storages for rebalancing.
+     * <ul>
+     *     <li>Cancels all current operations (including cursors) with storages and waits for their completion;</li>
+     *     <li>Cleans up storages;</li>
+     *     <li>Sets the last applied index and term to {@link MvPartitionStorage#REBALANCE_IN_PROGRESS} and the RAFT group configuration to
+     *     {@code null};</li>
+     *     <li>Only the following methods will be available:<ul>
+     *         <li>{@link #partitionKey()};</li>
+     *         <li>{@link #minLastAppliedIndex()};</li>
+     *         <li>{@link #maxLastAppliedIndex()};</li>
+     *         <li>{@link #minLastAppliedTerm()};</li>
+     *         <li>{@link #maxLastAppliedTerm()};</li>
+     *         <li>{@link #committedGroupConfiguration()};</li>
+     *         <li>{@link #addTxMeta(UUID, TxMeta)};</li>
+     *         <li>{@link #addWrite(RowId, TableRow, UUID, UUID, int)};</li>
+     *         <li>{@link #addWriteCommitted(RowId, TableRow, HybridTimestamp)}.</li>
+     *     </ul></li>
+     * </ul>
+     *
+     * <p>This method must be called before every rebalance and ends with a call to one of the methods:
+     * <ul>
+     *     <li>{@link #abortRebalance()} - in case of errors or cancellation of rebalance;</li>
+     *     <li>{@link #finishRebalance(long, long, RaftGroupConfiguration)} - in case of successful completion of rebalance.</li>
+     * </ul>
+     *
+     * @return Future of the operation.
+     * @throws StorageRebalanceException If there are errors when trying to start rebalancing.
+     */
+    CompletableFuture<Void> startRebalance();
+
+    /**
+     * Aborts rebalancing of the partition storages.
+     * <ul>
+     *     <li>Cleans up storages;</li>
+     *     <li>Resets the last applied index, term, and RAFT group configuration;</li>
+     *     <li>All methods will be available.</li>
+     * </ul>
+     *
+     * <p>If rebalance has not started, then nothing will happen.
+     *
+     * @return Future of the operation.
+     * @throws StorageRebalanceException If there are errors when trying to abort rebalancing.
+     */
+    CompletableFuture<Void> abortRebalance();
+
+    /**
+     * Completes rebalancing of the partition storages.
+     * <ul>
+     *     <li>Cleans up storages;</li>
+     *     <li>Updates the last applied index, term, and RAFT group configuration;</li>
+     *     <li>All methods will be available.</li>
+     * </ul>
+     *
+     * <p>If rebalance has not started, then {@link StorageRebalanceException} will be thrown.
+     *
+     * @param lastAppliedIndex Last applied index.
+     * @param lastAppliedTerm Last applied term.
+     * @param raftGroupConfig RAFT group configuration.
+     * @return Future of the operation.
+     * @throws StorageRebalanceException If there are errors when trying to finish rebalancing.
+     */
+    CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig);
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index 2b28e74334..4748a541da 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
-import static org.apache.ignite.internal.table.distributed.TableManager.FULL_RABALANCING_STARTED;
-
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -29,7 +27,6 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.tx.TxMeta;
@@ -77,36 +74,6 @@ public class PartitionAccessImpl implements PartitionAccess {
         return partitionKey;
     }
 
-    @Override
-    public CompletableFuture<Void> reCreateMvPartitionStorage() throws StorageException {
-        assert mvTableStorage.getMvPartition(partitionId()) != null : "table=" + tableName() + ", part=" + partitionId();
-
-        // TODO: IGNITE-18030 - actually recreate or do in a different way
-        //return mvTableStorage.destroyPartition(partId())
-        return CompletableFuture.completedFuture(null)
-                .thenApply(unused -> {
-                    MvPartitionStorage mvPartitionStorage = mvTableStorage.getOrCreateMvPartition(partitionId());
-
-                    mvPartitionStorage.runConsistently(() -> {
-                        mvPartitionStorage.lastApplied(FULL_RABALANCING_STARTED, 0);
-
-                        return null;
-                    });
-
-                    return null;
-                });
-    }
-
-    @Override
-    public void reCreateTxStatePartitionStorage() throws StorageException {
-        assert txStateTableStorage.getTxStateStorage(partitionId()) != null : "table=" + tableName() + ", part=" + partitionId();
-
-        // TODO: IGNITE-18030 - actually recreate or do in a different way
-        //txStateTableStorage.destroyTxStateStorage(partId());
-
-        txStateTableStorage.getOrCreateTxStateStorage(partitionId()).lastApplied(FULL_RABALANCING_STARTED, 0);
-    }
-
     private int partitionId() {
         return partitionKey.partitionId();
     }
@@ -166,22 +133,6 @@ public class PartitionAccessImpl implements PartitionAccess {
         });
     }
 
-    @Override
-    public void updateLastApplied(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig) {
-        MvPartitionStorage mvPartitionStorage = getMvPartitionStorage(partitionId());
-        TxStateStorage txStateStorage = getTxStateStorage(partitionId());
-
-        mvPartitionStorage.runConsistently(() -> {
-            mvPartitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
-
-            txStateStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
-
-            mvPartitionStorage.committedGroupConfiguration(raftGroupConfig);
-
-            return null;
-        });
-    }
-
     @Override
     public long minLastAppliedIndex() {
         return Math.min(
@@ -214,6 +165,39 @@ public class PartitionAccessImpl implements PartitionAccess {
         );
     }
 
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        // TODO: IGNITE-18619 Fix this bullshit, we should have already waited for the indexes to be created
+        indexes.get();
+
+        TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+
+        return CompletableFuture.allOf(
+                mvTableStorage.startRebalancePartition(partitionId()),
+                txStateStorage.startRebalance()
+        );
+    }
+
+    @Override
+    public CompletableFuture<Void> abortRebalance() {
+        TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+
+        return CompletableFuture.allOf(
+                mvTableStorage.abortRebalancePartition(partitionId()),
+                txStateStorage.abortRebalance()
+        );
+    }
+
+    @Override
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig) {
+        TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+
+        return CompletableFuture.allOf(
+                mvTableStorage.finishRebalancePartition(partitionId(), lastAppliedIndex, lastAppliedTerm, raftGroupConfig),
+                txStateStorage.finishRebalance(lastAppliedIndex, lastAppliedTerm)
+        );
+    }
+
     private MvPartitionStorage getMvPartitionStorage(int partitionId) {
         MvPartitionStorage mvPartitionStorage = mvTableStorage.getMvPartition(partitionId);
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index f8a698b5d1..3e05c7f320 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -20,11 +20,11 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 
 import java.nio.ByteBuffer;
-import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.error.RaftError;
@@ -69,6 +70,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
     private final SnapshotUri snapshotUri;
 
+    /** Busy lock for synchronous rebalance cancellation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
     /**
      * Snapshot meta read from the leader.
      *
@@ -77,10 +81,14 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
     @Nullable
     private volatile SnapshotMeta snapshotMeta;
 
-    private volatile boolean canceled;
+    @Nullable
+    private volatile CompletableFuture<?> rebalanceFuture;
 
+    /**
+     * Future is to wait in {@link #join()} because it is important for us to wait for the rebalance to finish or abort.
+     */
     @Nullable
-    private volatile CompletableFuture<?> future;
+    private volatile CompletableFuture<?> joinFuture;
 
     /**
      * Constructor.
@@ -97,46 +105,31 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
     public void start() {
         Executor executor = partitionSnapshotStorage.getIncomingSnapshotsExecutor();
 
-        LOG.info("Copier is started for the partition [partId={}, tableId={}]", partId(), tableId());
+        LOG.info("Copier is started for the partition [{}]", createPartitionInfo());
 
-        future = prepareMvPartitionStorageForRebalance()
-                .thenCompose(unused -> prepareTxStatePartitionStorageForRebalance(executor))
+        rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
                 .thenCompose(unused -> {
                     ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
 
                     if (snapshotSender == null) {
-                        LOG.error(
-                                "Snapshot sender not found [partId={}, tableId={}, nodeName={}]",
-                                partId(),
-                                tableId(),
-                                snapshotUri.nodeName
-                        );
-
-                        if (!isOk()) {
-                            setError(RaftError.UNKNOWN, "Sender node was not found or it is offline");
-                        }
-
-                        return completedFuture(null);
+                        throw new StorageRebalanceException("Snapshot sender not found: " + snapshotUri.nodeName);
                     }
 
                     return loadSnapshotMeta(snapshotSender)
                             .thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor))
-                            .thenAcceptAsync(unused1 -> updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor));
                 });
+
+        joinFuture = rebalanceFuture.handle((unused, throwable) -> completeRebalance(throwable)).thenCompose(Function.identity());
     }
 
     @Override
     public void join() throws InterruptedException {
-        CompletableFuture<?> fut = future;
+        CompletableFuture<?> fut = joinFuture;
 
         if (fut != null) {
             try {
                 fut.get();
-
-                if (canceled && !isOk()) {
-                    setError(RaftError.ECANCELED, "Copier is cancelled");
-                }
             } catch (CancellationException e) {
                 // Ignored.
             } catch (ExecutionException e) {
@@ -156,11 +149,11 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
     @Override
     public void cancel() {
-        canceled = true;
+        busyLock.block();
 
-        LOG.info("Copier is canceled for partition [partId={}, tableId={}]", partId(), tableId());
+        LOG.info("Copier is canceled for partition [{}]", createPartitionInfo());
 
-        CompletableFuture<?> fut = future;
+        CompletableFuture<?> fut = rebalanceFuture;
 
         if (fut != null) {
             fut.cancel(false);
@@ -185,34 +178,6 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
         return new IncomingSnapshotReader(snapshotMeta);
     }
 
-    /**
-     * Prepares the {@link MvPartitionStorage} for a full rebalance.
-     */
-    private CompletableFuture<?> prepareMvPartitionStorageForRebalance() {
-        if (canceled) {
-            return completedFuture(null);
-        }
-
-        // TODO: IGNITE-18030 Delete it
-        return partitionSnapshotStorage.partition().reCreateMvPartitionStorage();
-    }
-
-    /**
-     * Prepares the {@link TxStateStorage} for a full rebalance.
-     *
-     * <p>Recreates {@link TxStateStorage} and sets the last applied index to {@link TableManager#FULL_RABALANCING_STARTED} so that when
-     * the node is restarted, we can understand that the full rebalance has not completed, and we need to clean up the storage from
-     * garbage.
-     */
-    private CompletableFuture<?> prepareTxStatePartitionStorageForRebalance(Executor executor) {
-        if (canceled) {
-            return completedFuture(null);
-        }
-
-        // TODO: IGNITE-18030 Delete it
-        return CompletableFuture.runAsync(() -> partitionSnapshotStorage.partition().reCreateTxStatePartitionStorage(), executor);
-    }
-
     private @Nullable ClusterNode getSnapshotSender(String nodeName) {
         return partitionSnapshotStorage.topologyService().getByConsistentId(nodeName);
     }
@@ -221,178 +186,220 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
      * Requests and saves the snapshot meta in {@link #snapshotMeta}.
      */
     private CompletableFuture<?> loadSnapshotMeta(ClusterNode snapshotSender) {
-        if (canceled) {
+        if (!busyLock.enterBusy()) {
             return completedFuture(null);
         }
 
-        return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
-                snapshotSender,
-                MSG_FACTORY.snapshotMetaRequest().id(snapshotUri.snapshotId).build(),
-                NETWORK_TIMEOUT
-        ).thenAccept(response -> {
-            snapshotMeta = ((SnapshotMetaResponse) response).meta();
-
-            LOG.info("Copier has loaded the snapshot meta for the partition [partId={}, tableId={}, meta={}]",
-                    partId(), tableId(), snapshotMeta);
-        });
+        try {
+            return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+                    snapshotSender,
+                    MSG_FACTORY.snapshotMetaRequest().id(snapshotUri.snapshotId).build(),
+                    NETWORK_TIMEOUT
+            ).thenAccept(response -> {
+                snapshotMeta = ((SnapshotMetaResponse) response).meta();
+
+                LOG.info("Copier has loaded the snapshot meta for the partition [{}, meta={}]", createPartitionInfo(), snapshotMeta);
+            });
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * Requests and stores data into {@link MvPartitionStorage}.
      */
     private CompletableFuture<?> loadSnapshotMvData(ClusterNode snapshotSender, Executor executor) {
-        if (canceled) {
+        if (!busyLock.enterBusy()) {
             return completedFuture(null);
         }
 
-        return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
-                snapshotSender,
-                MSG_FACTORY.snapshotMvDataRequest()
-                        .id(snapshotUri.snapshotId)
-                        .batchSizeHint(MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT)
-                        .build(),
-                NETWORK_TIMEOUT
-        ).thenComposeAsync(response -> {
-            SnapshotMvDataResponse snapshotMvDataResponse = ((SnapshotMvDataResponse) response);
-
-            for (ResponseEntry entry : snapshotMvDataResponse.rows()) {
-                if (canceled) {
-                    return completedFuture(null);
-                }
-
-                // Let's write all versions for the row ID.
-                RowId rowId = new RowId(partId(), entry.rowId());
-
-                for (int i = 0; i < entry.rowVersions().size(); i++) {
-                    HybridTimestamp timestamp = i < entry.timestamps().size() ? entry.timestamps().get(i) : null;
-
-                    ByteBuffer rowVersion = entry.rowVersions().get(i);
-
-                    TableRow tableRow = rowVersion == null ? null : new TableRow(rowVersion.rewind());
-
-                    PartitionAccess partition = partitionSnapshotStorage.partition();
-
-                    if (timestamp == null) {
-                        // Writes an intent to write (uncommitted version).
-                        assert entry.txId() != null;
-                        assert entry.commitTableId() != null;
-                        assert entry.commitPartitionId() != ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+        try {
+            return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+                    snapshotSender,
+                    MSG_FACTORY.snapshotMvDataRequest()
+                            .id(snapshotUri.snapshotId)
+                            .batchSizeHint(MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT)
+                            .build(),
+                    NETWORK_TIMEOUT
+            ).thenComposeAsync(response -> {
+                SnapshotMvDataResponse snapshotMvDataResponse = ((SnapshotMvDataResponse) response);
+
+                for (ResponseEntry entry : snapshotMvDataResponse.rows()) {
+                    // Let's write all versions for the row ID.
+                    for (int i = 0; i < entry.rowVersions().size(); i++) {
+                        if (!busyLock.enterBusy()) {
+                            return completedFuture(null);
+                        }
 
-                        partition.addWrite(rowId, tableRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId());
-                    } else {
-                        // Writes committed version.
-                        partition.addWriteCommitted(rowId, tableRow, timestamp);
+                        try {
+                            writeVersion(entry, i);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
                     }
                 }
-            }
 
-            if (snapshotMvDataResponse.finish()) {
-                LOG.info(
-                        "Copier has finished loading multi-versioned data [partId={}, rows={}]",
-                        partId(),
-                        snapshotMvDataResponse.rows().size()
-                );
-
-                return completedFuture(null);
-            } else {
-                LOG.info(
-                        "Copier has loaded a portion of multi-versioned data [partId={}, rows={}]",
-                        partId(),
-                        snapshotMvDataResponse.rows().size()
-                );
-
-                // Let's upload the rest.
-                return loadSnapshotMvData(snapshotSender, executor);
-            }
-        }, executor);
+                if (snapshotMvDataResponse.finish()) {
+                    LOG.info(
+                            "Copier has finished loading multi-versioned data [{}, rows={}]",
+                            createPartitionInfo(),
+                            snapshotMvDataResponse.rows().size()
+                    );
+
+                    return completedFuture(null);
+                } else {
+                    LOG.info(
+                            "Copier has loaded a portion of multi-versioned data [{}, rows={}]",
+                            createPartitionInfo(),
+                            snapshotMvDataResponse.rows().size()
+                    );
+
+                    // Let's upload the rest.
+                    return loadSnapshotMvData(snapshotSender, executor);
+                }
+            }, executor);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * Requests and stores data into {@link TxStateStorage}.
      */
     private CompletableFuture<?> loadSnapshotTxData(ClusterNode snapshotSender, Executor executor) {
-        if (canceled) {
+        if (!busyLock.enterBusy()) {
             return completedFuture(null);
         }
 
-        return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
-                snapshotSender,
-                MSG_FACTORY.snapshotTxDataRequest()
-                        .id(snapshotUri.snapshotId)
-                        .maxTransactionsInBatch(MAX_TX_DATA_BATCH_SIZE)
-                        .build(),
-                NETWORK_TIMEOUT
-        ).thenComposeAsync(response -> {
-            SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse) response;
-
-            assert snapshotTxDataResponse.txMeta().size() == snapshotTxDataResponse.txIds().size();
+        try {
+            return partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(
+                    snapshotSender,
+                    MSG_FACTORY.snapshotTxDataRequest()
+                            .id(snapshotUri.snapshotId)
+                            .maxTransactionsInBatch(MAX_TX_DATA_BATCH_SIZE)
+                            .build(),
+                    NETWORK_TIMEOUT
+            ).thenComposeAsync(response -> {
+                SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse) response;
+
+                assert snapshotTxDataResponse.txMeta().size() == snapshotTxDataResponse.txIds().size() : createPartitionInfo();
+
+                for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); i++) {
+                    if (!busyLock.enterBusy()) {
+                        return completedFuture(null);
+                    }
 
-            for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); i++) {
-                if (canceled) {
-                    return completedFuture(null);
+                    try {
+                        partitionSnapshotStorage.partition().addTxMeta(
+                                snapshotTxDataResponse.txIds().get(i),
+                                snapshotTxDataResponse.txMeta().get(i)
+                        );
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
                 }
 
-                partitionSnapshotStorage.partition().addTxMeta(
-                        snapshotTxDataResponse.txIds().get(i),
-                        snapshotTxDataResponse.txMeta().get(i)
-                );
-            }
+                if (snapshotTxDataResponse.finish()) {
+                    LOG.info(
+                            "Copier has finished loading transaction meta [{}, metas={}]",
+                            createPartitionInfo(),
+                            snapshotTxDataResponse.txMeta().size()
+                    );
 
-            if (snapshotTxDataResponse.finish()) {
-                LOG.info(
-                        "Copier has finished loading transaction meta [partId={}, metas={}]",
-                        partId(),
-                        snapshotTxDataResponse.txMeta().size()
-                );
-
-                return completedFuture(null);
-            } else {
-                LOG.info(
-                        "Copier has loaded a portion of transaction meta [partId={}, metas={}]",
-                        partId(),
-                        snapshotTxDataResponse.txMeta().size()
-                );
-
-                // Let's upload the rest.
-                return loadSnapshotTxData(snapshotSender, executor);
-            }
-        }, executor);
+                    return completedFuture(null);
+                } else {
+                    LOG.info(
+                            "Copier has loaded a portion of transaction meta [{}, metas={}]",
+                            createPartitionInfo(),
+                            snapshotTxDataResponse.txMeta().size()
+                    );
+
+                    // Let's upload the rest.
+                    return loadSnapshotTxData(snapshotSender, executor);
+                }
+            }, executor);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
-     * Updates the last applied index for {@link MvPartitionStorage} and {@link TxStateStorage} from the {@link #snapshotMeta}.
+     * Completes rebalancing of the partition storages.
+     *
+     * @param throwable Error occurred while rebalancing the partition storages, {@code null} means that the rebalancing was successful.
      */
-    private void updateLastAppliedIndexFromSnapshotMetaForStorages() {
-        if (canceled) {
-            return;
-        }
+    private CompletableFuture<Void> completeRebalance(@Nullable Throwable throwable) {
+        if (!busyLock.enterBusy()) {
+            if (!isOk()) {
+                setError(RaftError.ECANCELED, "Copier is cancelled");
+            }
 
-        SnapshotMeta meta = snapshotMeta;
+            return partitionSnapshotStorage.partition().abortRebalance();
+        }
 
-        assert meta != null;
+        try {
+            if (throwable != null) {
+                LOG.error("Partition rebalancing error [{}]", throwable, createPartitionInfo());
 
-        RaftGroupConfiguration raftGroupConfig = new RaftGroupConfiguration(
-                meta.peersList(),
-                meta.learnersList(),
-                meta.oldPeersList(),
-                meta.oldLearnersList()
-        );
+                if (!isOk()) {
+                    setError(RaftError.UNKNOWN, throwable.getMessage());
+                }
 
-        partitionSnapshotStorage.partition().updateLastApplied(meta.lastIncludedIndex(), meta.lastIncludedTerm(), raftGroupConfig);
+                return partitionSnapshotStorage.partition().abortRebalance();
+            }
 
-        LOG.info(
-                "Copier has finished updating last applied index for the partition [partId={}, lastAppliedIndex={}, lastAppliedTerm={}]",
-                partId(),
-                meta.lastIncludedIndex(),
-                meta.lastIncludedTerm()
-        );
+            SnapshotMeta meta = snapshotMeta;
+
+            RaftGroupConfiguration raftGroupConfig = new RaftGroupConfiguration(
+                    meta.peersList(),
+                    meta.learnersList(),
+                    meta.oldPeersList(),
+                    meta.oldLearnersList()
+            );
+
+            LOG.info(
+                    "Copier completes the rebalancing of the partition: [{}, lastAppliedIndex={}, lastAppliedTerm={}, raftGroupConfig={}]",
+                    createPartitionInfo(),
+                    meta.lastIncludedIndex(),
+                    meta.lastIncludedTerm(),
+                    raftGroupConfig
+            );
+
+            return partitionSnapshotStorage.partition().finishRebalance(meta.lastIncludedIndex(), meta.lastIncludedTerm(), raftGroupConfig);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     private int partId() {
         return partitionSnapshotStorage.partition().partitionKey().partitionId();
     }
 
-    private UUID tableId() {
-        return partitionSnapshotStorage.partition().partitionKey().tableId();
+    private String createPartitionInfo() {
+        return "tableId=" + partitionSnapshotStorage.partition().partitionKey().tableId() + ", partitionId=" + partId();
+    }
+
+    private void writeVersion(ResponseEntry entry, int i) {
+        RowId rowId = new RowId(partId(), entry.rowId());
+
+        HybridTimestamp timestamp = i < entry.timestamps().size() ? entry.timestamps().get(i) : null;
+
+        ByteBuffer rowVersion = entry.rowVersions().get(i);
+
+        TableRow tableRow = rowVersion == null ? null : new TableRow(rowVersion.rewind());
+
+        PartitionAccess partition = partitionSnapshotStorage.partition();
+
+        if (timestamp == null) {
+            // Writes an intent to write (uncommitted version).
+            assert entry.txId() != null;
+            assert entry.commitTableId() != null;
+            assert entry.commitPartitionId() != ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
+
+            partition.addWrite(rowId, tableRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId());
+        } else {
+            // Writes committed version.
+            partition.addWriteCommitted(rowId, tableRow, timestamp);
+        }
     }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index bba3b0400d..7e6cd7c60d 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -189,13 +189,8 @@ public class IncomingSnapshotCopierTest {
         assertEqualsMvRows(outgoingMvPartitionStorage, incomingMvPartitionStorage, rowIds);
         assertEqualsTxStates(outgoingTxStatePartitionStorage, incomingTxStatePartitionStorage, txIds);
 
-        // TODO: IGNITE-18030 - uncomment the following line or remove it if not needed after the rework
-        //verify(incomingMvTableStorage, times(1)).destroyPartition(eq(TEST_PARTITION));
-        verify(incomingMvTableStorage, times(2)).getOrCreateMvPartition(eq(TEST_PARTITION));
-
-        // TODO: IGNITE-18030 - uncomment the following line or remove it if not needed after the rework
-        //verify(incomingTxStateTableStorage, times(1)).destroyTxStateStorage(eq(TEST_PARTITION));
-        verify(incomingTxStateTableStorage, times(2)).getOrCreateTxStateStorage(eq(TEST_PARTITION));
+        verify(incomingMvTableStorage, times(1)).startRebalancePartition(eq(TEST_PARTITION));
+        verify(incomingTxStatePartitionStorage, times(1)).startRebalance();
     }
 
     private MessagingService messagingServiceForSuccessScenario(MvPartitionStorage outgoingMvPartitionStorage,
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateTableStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateTableStorage.java
index ac23d35c21..d84d4ea013 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateTableStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateTableStorage.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.tx.storage.state.test;
 
+import static org.mockito.Mockito.spy;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -31,7 +33,7 @@ public class TestTxStateTableStorage implements TxStateTableStorage {
     private final Map<Integer, TxStateStorage> storages = new ConcurrentHashMap<>();
 
     @Override public TxStateStorage getOrCreateTxStateStorage(int partitionId) {
-        return storages.computeIfAbsent(partitionId, k -> new TestTxStateStorage());
+        return storages.computeIfAbsent(partitionId, k -> spy(new TestTxStateStorage()));
     }
 
     @Override