You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/27 07:11:31 UTC

[ignite-3] branch main updated: IGNITE-18603 Clear all storages of a partition if one of the storages did not have time to rebalance (#1578)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ba76fcf2da IGNITE-18603 Clear all storages of a partition if one of the storages did not have time to rebalance (#1578)
ba76fcf2da is described below

commit ba76fcf2da5299e79a8e5204e73a57b4616d7656
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jan 27 10:11:25 2023 +0300

    IGNITE-18603 Clear all storages of a partition if one of the storages did not have time to rebalance (#1578)
---
 .../internal/storage/engine/MvTableStorage.java    |  26 +++++
 .../ignite/internal/storage/util/StorageState.java |   5 +-
 .../ignite/internal/storage/util/StorageUtils.java |  51 ++++++++++
 .../storage/AbstractMvTableStorageTest.java        |  88 ++++++++++++++---
 .../storage/impl/TestMvPartitionStorage.java       |  16 ++-
 .../internal/storage/impl/TestMvTableStorage.java  |  18 ++++
 .../storage/index/impl/TestHashIndexStorage.java   |  12 ++-
 .../storage/index/impl/TestSortedIndexStorage.java |  12 ++-
 .../pagememory/AbstractPageMemoryTableStorage.java |  29 ++++++
 .../PersistentPageMemoryTableStorage.java          |  32 +-----
 .../pagememory/VolatilePageMemoryTableStorage.java |   2 +-
 .../index/hash/PageMemoryHashIndexStorage.java     |  37 ++++++-
 .../index/sorted/PageMemorySortedIndexStorage.java |  37 ++++++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  90 +++++++++++++----
 .../mv/PersistentPageMemoryMvPartitionStorage.java |  52 +++++++---
 .../mv/VolatilePageMemoryMvPartitionStorage.java   |  63 ++++++++----
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |  50 ++++++++--
 .../storage/rocksdb/RocksDbTableStorage.java       |  58 ++++++++++-
 .../rocksdb/index/RocksDbHashIndexStorage.java     |  25 +++++
 .../rocksdb/index/RocksDbSortedIndexStorage.java   |  25 +++++
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |   7 --
 .../internal/table/distributed/TableManager.java   |  41 +++++---
 .../internal/tx/storage/state/TxStateStorage.java  |  18 ++++
 .../state/rocksdb/TxStateRocksDbStorage.java       | 109 ++++++++++++++-------
 .../state/rocksdb/RocksDbTxStateStorageTest.java   |  23 ++---
 .../storage/state/AbstractTxStateStorageTest.java  |  79 ++++++++++++---
 .../tx/storage/state/test/TestTxStateStorage.java  |  16 +++
 27 files changed, 803 insertions(+), 218 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index 6e4619759d..71fae079d4 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.schema.configuration.index.TableIndexConfigura
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -257,4 +258,29 @@ public interface MvTableStorage extends ManuallyCloseable {
             long lastAppliedTerm,
             RaftGroupConfiguration raftGroupConfig
     );
+
+    /**
+     * Clears a partition and all associated indices. After the cleaning is completed, a partition and all associated indices will be fully
+     * available.
+     * <ul>
+     *     <li>Cancels all current operations (including cursors) of a {@link MvPartitionStorage multi-version partition storage} and its
+     *     associated indexes ({@link HashIndexStorage} and {@link SortedIndexStorage}) and waits for their completion;</li>
+     *     <li>Does not allow operations on a multi-version partition storage and its indexes to be performed (exceptions will be thrown)
+     *     until the cleaning is completed;</li>
+     *     <li>Clears a multi-version partition storage and its indexes;</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()}, {@link MvPartitionStorage#lastAppliedTerm()},
+     *     {@link MvPartitionStorage#persistedIndex()} to {@code 0} and {@link MvPartitionStorage#committedGroupConfiguration()} to
+     *     {@code null};</li>
+     *     <li>Once cleanup a multi-version partition storage and its indexes is complete (success or error), allows to perform all with a
+     *     multi-version partition storage and its indexes.</li>
+     * </ul>
+     *
+     * @return Future of cleanup of a multi-version partition storage and its indexes.
+     * @throws IllegalArgumentException If Partition ID is out of bounds.
+     * @throws StorageClosedException If a multi-version partition storage and its indexes is already closed or destroyed.
+     * @throws StorageRebalanceException If a multi-version partition storage and its indexes are in process of rebalance.
+     * @throws StorageException StorageException If a multi-version partition storage and its indexes are in progress of cleanup or failed
+     *      for another reason.
+     */
+    CompletableFuture<Void> clearPartition(int partitionId);
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageState.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageState.java
index 711df878cf..8d22252765 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageState.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageState.java
@@ -28,5 +28,8 @@ public enum StorageState {
     CLOSED,
 
     /** Storage is in the process of rebalancing. */
-    REBALANCE
+    REBALANCE,
+
+    /** Storage is in the process of cleanup. */
+    CLEANUP
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
index 9034d5567d..cf67fbf12a 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
@@ -36,6 +36,49 @@ public class StorageUtils {
         return "Partition ID " + partitionId + " does not exist";
     }
 
+    /**
+     * Throws an exception if the state of the storage is not {@link StorageState#RUNNABLE}.
+     *
+     * @param state Storage state.
+     * @param storageInfoSupplier Storage information supplier, for example in the format "table=user, partitionId=1".
+     * @throws StorageClosedException If the storage is closed.
+     * @throws StorageRebalanceException If storage is in the process of rebalancing.
+     * @throws StorageException For other {@link StorageState}.
+     */
+    public static void throwExceptionIfStorageNotInRunnableState(StorageState state, Supplier<String> storageInfoSupplier) {
+        if (state != StorageState.RUNNABLE) {
+            throwExceptionDependingOnStorageState(state, storageInfoSupplier.get());
+        }
+    }
+
+    /**
+     * Throws an exception if the state of the storage is not {@link StorageState#RUNNABLE} OR {@link StorageState#REBALANCE}.
+     *
+     * @param state Storage state.
+     * @param storageInfoSupplier Storage information supplier, for example in the format "table=user, partitionId=1".
+     * @throws StorageClosedException If the storage is closed.
+     * @throws StorageException For other {@link StorageState}.
+     */
+    public static void throwExceptionIfStorageNotInRunnableOrRebalanceState(StorageState state, Supplier<String> storageInfoSupplier) {
+        if (state != StorageState.RUNNABLE && state != StorageState.REBALANCE) {
+            throwExceptionDependingOnStorageState(state, storageInfoSupplier.get());
+        }
+    }
+
+    /**
+     * Throws an exception if the state of the storage is not {@link StorageState#CLEANUP} OR {@link StorageState#REBALANCE}.
+     *
+     * @param state Storage state.
+     * @param storageInfoSupplier Storage information supplier, for example in the format "table=user, partitionId=1".
+     * @throws StorageClosedException If the storage is closed.
+     * @throws StorageException For other {@link StorageState}.
+     */
+    public static void throwExceptionIfStorageNotInCleanupOrRebalancedState(StorageState state, Supplier<String> storageInfoSupplier) {
+        if (state != StorageState.CLEANUP && state != StorageState.REBALANCE) {
+            throwExceptionDependingOnStorageState(state, storageInfoSupplier.get());
+        }
+    }
+
     /**
      * Throws an {@link StorageRebalanceException} if the storage is in the process of rebalancing.
      *
@@ -61,6 +104,8 @@ public class StorageUtils {
                 throw new StorageRebalanceException(createStorageClosedErrorMessage(storageInfo));
             case REBALANCE:
                 throw new StorageRebalanceException(createStorageInProcessOfRebalanceErrorMessage(storageInfo));
+            case CLEANUP:
+                throw new StorageRebalanceException(createStorageInProcessOfCleanupErrorMessage(storageInfo));
             default:
                 throw new StorageRebalanceException(createUnexpectedStorageStateErrorMessage(state, storageInfo));
         }
@@ -81,6 +126,8 @@ public class StorageUtils {
                 throw new StorageClosedException(createStorageClosedErrorMessage(storageInfo));
             case REBALANCE:
                 throw new StorageRebalanceException(createStorageInProcessOfRebalanceErrorMessage(storageInfo));
+            case CLEANUP:
+                throw new StorageException(createStorageInProcessOfCleanupErrorMessage(storageInfo));
             default:
                 throw new StorageException(createUnexpectedStorageStateErrorMessage(state, storageInfo));
         }
@@ -109,4 +156,8 @@ public class StorageUtils {
     private static String createStorageClosedErrorMessage(String storageInfo) {
         return IgniteStringFormatter.format("Storage is already closed: [{}]", storageInfo);
     }
+
+    private static String createStorageInProcessOfCleanupErrorMessage(String storageInfo) {
+        return IgniteStringFormatter.format("Storage is in the process of cleanup: [{}]", storageInfo);
+    }
 }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 3a70adbfef..d88d271621 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -39,7 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assumptions.assumeFalse;
 import static org.mockito.Mockito.mock;
 
 import java.nio.ByteBuffer;
@@ -593,19 +592,15 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
         assertThat(tableStorage.destroy(), willCompleteSuccessfully());
     }
 
-    /**
-     * Checks that if we restart the storages after a crash in the middle of a rebalance, the storages will be empty.
-     */
     @Test
-    public void testRestartStoragesAfterFailDuringRebalance() {
-        assumeFalse(tableStorage.isVolatile());
-
+    public void testRestartStoragesInTheMiddleOfRebalance() {
         MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
         HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
         SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
 
         List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rows = List.of(
-                new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now())
+                new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+                new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(1, "1"), new TestValue(1, "1")), clock.now())
         );
 
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
@@ -632,10 +627,76 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
         hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
         sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
 
-        // Let's check the repositories: they should be empty.
-        checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+        if (tableStorage.isVolatile()) {
+            // Let's check the repositories: they should be empty.
+            checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+
+            checkLastApplied(mvPartitionStorage, 0, 0, 0);
+        } else {
+            checkForPresenceRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+
+            checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        }
+    }
+
+    @Test
+    void testClear() {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.clearPartition(getPartitionIdOutOfRange()));
+
+        // Let's check that there will be an error for a non-existent partition.
+        assertThrows(StorageException.class, () -> tableStorage.clearPartition(PARTITION_ID));
+
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        // Let's check the cleanup for an empty partition.
+        assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully());
+
+        checkLastApplied(mvPartitionStorage, 0, 0, 0);
+        assertNull(mvPartitionStorage.committedGroupConfiguration());
+
+        // Let's fill the storages and clean them.
+        List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rows = List.of(
+                new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+                new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        );
+
+        RaftGroupConfiguration raftGroupConfig = createRandomRaftGroupConfiguration();
+
+        fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.lastApplied(100, 500);
+
+            mvPartitionStorage.committedGroupConfiguration(raftGroupConfig);
+
+            return null;
+        });
+
+        // Let's clear the storages and check them out.
+        assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully());
 
         checkLastApplied(mvPartitionStorage, 0, 0, 0);
+        assertNull(mvPartitionStorage.committedGroupConfiguration());
+
+        checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+    }
+
+    @Test
+    void testClearForCanceledOrRebalancedPartition() {
+        MvPartitionStorage mvPartitionStorage0 = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        tableStorage.getOrCreateMvPartition(PARTITION_ID + 1);
+
+        mvPartitionStorage0.close();
+        assertThat(tableStorage.startRebalancePartition(PARTITION_ID + 1), willCompleteSuccessfully());
+
+        try {
+            assertThrows(StorageClosedException.class, () -> tableStorage.clearPartition(PARTITION_ID));
+            assertThrows(StorageRebalanceException.class, () -> tableStorage.clearPartition(PARTITION_ID + 1));
+        } finally {
+            assertThat(tableStorage.abortRebalancePartition(PARTITION_ID + 1), willCompleteSuccessfully());
+        }
     }
 
     private static void createTestIndexes(TablesConfiguration tablesConfig) {
@@ -742,8 +803,6 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
             SortedIndexStorage sortedIndexStorage,
             List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rowsBeforeRebalanceStart
     ) {
-        assertThat(rowsBeforeRebalanceStart, hasSize(greaterThanOrEqualTo(2)));
-
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart);
 
         // Let's open the cursors before start rebalance.
@@ -852,6 +911,8 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
             SortedIndexStorage sortedIndexStorage,
             List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rows
     ) {
+        assertThat(rows, hasSize(greaterThanOrEqualTo(2)));
+
         for (int i = 0; i < rows.size(); i++) {
             int finalI = i;
 
@@ -865,8 +926,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
             IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), tableRow, rowId);
 
             mvPartitionStorage.runConsistently(() -> {
-                // If even.
-                if ((finalI & 1) == 0) {
+                if ((finalI % 2) == 0) {
                     mvPartitionStorage.addWrite(rowId, tableRow, UUID.randomUUID(), UUID.randomUUID(), rowId.partitionId());
 
                     mvPartitionStorage.commitWrite(rowId, timestamp);
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 814e230c69..17e525fe4e 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -540,7 +540,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
         closed = true;
 
-        clear();
+        clear0();
     }
 
     public void destroy() {
@@ -549,9 +549,19 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
     /** Removes all entries from this storage. */
     public synchronized void clear() {
+        checkStorageClosedOrInProcessOfRebalance();
+
+        clear0();
+    }
+
+    private synchronized void clear0() {
         map.clear();
 
         gcQueue.clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+        groupConfig = null;
     }
 
     private void checkStorageClosed() {
@@ -576,7 +586,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
         rebalance = true;
 
-        clear();
+        clear0();
 
         lastAppliedIndex = REBALANCE_IN_PROGRESS;
         lastAppliedTerm = REBALANCE_IN_PROGRESS;
@@ -593,7 +603,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
         rebalance = false;
 
-        clear();
+        clear0();
 
         lastAppliedIndex = 0;
         lastAppliedTerm = 0;
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index c2d196aff2..33829de4ac 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -339,6 +339,24 @@ public class TestMvTableStorage implements MvTableStorage {
                 });
     }
 
+    @Override
+    public CompletableFuture<Void> clearPartition(int partitionId) {
+        checkPartitionId(partitionId);
+
+        TestMvPartitionStorage mvPartitionStorage = partitions.get(partitionId);
+
+        if (mvPartitionStorage == null) {
+            throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
+        }
+
+        mvPartitionStorage.clear();
+
+        testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::clear);
+        testSortedIndexStorageStream(partitionId).forEach(TestSortedIndexStorage::clear);
+
+        return completedFuture(null);
+    }
+
     private void checkPartitionId(int partitionId) {
         Integer partitions = tableCfg.partitions().value();
 
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index d66773b94e..9dbdf16928 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -131,13 +131,19 @@ public class TestHashIndexStorage implements HashIndexStorage {
     public void destroy() {
         closed = true;
 
-        clear();
+        clear0();
     }
 
     /**
      * Removes all index data.
      */
     public void clear() {
+        checkStorageClosedOrInProcessOfRebalance();
+
+        clear0();
+    }
+
+    private void clear0() {
         index.clear();
     }
 
@@ -163,7 +169,7 @@ public class TestHashIndexStorage implements HashIndexStorage {
 
         rebalance = true;
 
-        clear();
+        clear0();
     }
 
     /**
@@ -178,7 +184,7 @@ public class TestHashIndexStorage implements HashIndexStorage {
 
         rebalance = false;
 
-        clear();
+        clear0();
     }
 
     /**
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index ba93ff06d1..9d15eabdc2 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -178,13 +178,19 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
     public void destroy() {
         closed = true;
 
-        clear();
+        clear0();
     }
 
     /**
      * Removes all index data.
      */
     public void clear() {
+        checkStorageClosedOrInProcessOfRebalance();
+
+        index.clear();
+    }
+
+    private void clear0() {
         index.clear();
     }
 
@@ -337,7 +343,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
 
         rebalance = true;
 
-        clear();
+        clear0();
     }
 
     /**
@@ -352,7 +358,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
 
         rebalance = false;
 
-        clear();
+        clear0();
     }
 
     /**
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 6e6b2bf4e5..d2ee618127 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -412,6 +412,35 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
         });
     }
 
+    @Override
+    public CompletableFuture<Void> clearPartition(int partitionId) {
+        return inBusyLock(busyLock, () -> {
+            AbstractPageMemoryMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            try {
+                mvPartitionStorage.startCleanup();
+
+                return clearStorageAndUpdateDataStructures(mvPartitionStorage)
+                        .whenComplete((unused, throwable) -> mvPartitionStorage.finishCleanup());
+            } catch (StorageException e) {
+                mvPartitionStorage.finishCleanup();
+
+                throw e;
+            } catch (Throwable t) {
+                mvPartitionStorage.finishCleanup();
+
+                throw new StorageException(
+                        IgniteStringFormatter.format("Failed to cleanup storage: [{}]", mvPartitionStorage.createStorageInfo()),
+                        t
+                );
+            }
+        });
+    }
+
     /**
      * Clears the partition multi-version storage and all its indexes, updates their internal data structures such as {@link BplusTree},
      * {@link FreeList} and {@link ReuseList}.
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 34b3f50f35..5dbeb231df 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.storage.pagememory;
 
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
-import static org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -395,10 +394,7 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
 
             int partitionId = groupPartitionId.getPartitionId();
 
-            PartitionMeta meta = getOrCreatePartitionMeta(
-                    groupPartitionId,
-                    ensurePartitionFilePageStoreExists(tableView, groupPartitionId)
-            );
+            PartitionMeta meta = getOrCreatePartitionMetaOnCreatePartition(groupPartitionId);
 
             inCheckpointLock(() -> {
                 RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
@@ -410,7 +406,7 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
 
                 IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructures(
                         meta,
                         rowVersionFreeList,
                         indexColumnsFreeList,
@@ -485,7 +481,7 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
     }
 
     /**
-     * Creates or receives partition meta from a file, if the rebalancing has not finished in time, the file page store will be recreated.
+     * Creates or receives partition meta from a file.
      *
      * <p>Safe to use without a checkpointReadLock as we read the meta directly without using {@link PageMemory}.
      *
@@ -496,27 +492,7 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
 
         FilePageStore filePageStore = ensurePartitionFilePageStoreExists(tableView, groupPartitionId);
 
-        PartitionMeta partitionMeta = getOrCreatePartitionMeta(groupPartitionId, filePageStore);
-
-        if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) {
-            try {
-                // TODO: IGNITE-18565 We need to return a CompletableFuture
-                destroyPartitionPhysically(groupPartitionId).get(10, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                throw new StorageException(
-                        IgniteStringFormatter.format(
-                                "Error when physically destroying a partition: [table={}, partitionId={}]",
-                                getTableName(),
-                                groupPartitionId.getPartitionId()
-                        ),
-                        e
-                );
-            }
-
-            return getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStoreExists(tableView, groupPartitionId));
-        } else {
-            return partitionMeta;
-        }
+        return getOrCreatePartitionMeta(groupPartitionId, filePageStore);
     }
 
     private void waitPartitionToBeDestroyed(int partitionId) {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 66aeb50a45..584b54e29a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -154,7 +154,7 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
         int partitionId = mvPartitionStorage.partitionId();
         TableView tableView = tableCfg.value();
 
-        volatilePartitionStorage.updateDataStructuresOnRebalance(
+        volatilePartitionStorage.updateDataStructures(
                 createVersionChainTree(partitionId, tableView),
                 createIndexMetaTree(partitionId, tableView)
         );
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 21fd68173f..7ad2203078 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.index.hash;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -286,14 +286,14 @@ public class PageMemoryHashIndexStorage implements HashIndexStorage {
     }
 
     /**
-     * Updates the internal data structures of the storage on rebalance.
+     * Updates the internal data structures of the storage on rebalance or cleanup.
      *
      * @param freeList Free list to store index columns.
      * @param hashIndexTree Hash index tree instance.
-     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     * @throws StorageException If failed.
      */
-    public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, HashIndexTree hashIndexTree) {
-        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+    public void updateDataStructures(IndexColumnsFreeList freeList, HashIndexTree hashIndexTree) {
+        throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
 
         this.freeList = freeList;
         this.hashIndexTree = hashIndexTree;
@@ -302,4 +302,31 @@ public class PageMemoryHashIndexStorage implements HashIndexStorage {
     private String createStorageInfo() {
         return IgniteStringFormatter.format("indexId={}, partitionId={}", descriptor.id(), partitionId);
     }
+
+    /**
+     * Prepares the storage for cleanup.
+     *
+     * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+     */
+    public void startCleanup() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            hashIndexTree.close();
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Finishes cleanup up the storage.
+     */
+    public void finishCleanup() {
+        state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 79cde00a2e..111a1f6f78 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.index.sorted;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 
 import java.nio.ByteBuffer;
 import java.util.NoSuchElementException;
@@ -422,14 +422,14 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage {
     }
 
     /**
-     * Updates the internal data structures of the storage on rebalance.
+     * Updates the internal data structures of the storage on rebalance or cleanup.
      *
      * @param freeList Free list to store index columns.
      * @param sortedIndexTree Sorted index tree instance.
-     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     * @throws StorageException If failed.
      */
-    public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, SortedIndexTree sortedIndexTree) {
-        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+    public void updateDataStructures(IndexColumnsFreeList freeList, SortedIndexTree sortedIndexTree) {
+        throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
 
         this.freeList = freeList;
         this.sortedIndexTree = sortedIndexTree;
@@ -470,4 +470,31 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage {
             indexRow.indexColumns().link(PageIdUtils.NULL_LINK);
         }
     }
+
+    /**
+     * Prepares the storage for cleanup.
+     *
+     * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+     */
+    public void startCleanup() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            sortedIndexTree.close();
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Finishes cleanup up the storage.
+     */
+    public void finishCleanup() {
+        state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 323d368648..d90c6be71f 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -21,7 +21,8 @@ import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.ge
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -146,6 +147,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
      */
     public void start() {
         busy(() -> {
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
+
             try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
                 NamedListView<TableIndexView> indexesCfgView = tableStorage.tablesConfiguration().indexes().value();
 
@@ -165,7 +168,10 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
                 return null;
             } catch (Exception e) {
-                throw new StorageException("Failed to process SQL indexes during the partition start", e);
+                throw new StorageException(
+                        IgniteStringFormatter.format("Failed to process SQL indexes during partition start: [{}]", createStorageInfo()),
+                        e
+                );
             }
         });
     }
@@ -196,6 +202,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     }
 
     private PageMemoryHashIndexStorage createOrRestoreHashIndex(IndexMeta indexMeta) {
+        throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
+
         var indexDescriptor = new HashIndexDescriptor(indexMeta.id(), tableStorage.tablesConfiguration().value());
 
         HashIndexTree hashIndexTree = createHashIndexTree(indexDescriptor, indexMeta);
@@ -236,9 +244,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         } catch (IgniteInternalCheckedException e) {
             throw new StorageException(
                     IgniteStringFormatter.format(
-                            "Error creating hash index tree: [table={}, partitionId={}, indexId={}]",
-                            tableStorage.getTableName(),
-                            partitionId,
+                            "Error creating hash index tree: [{}, indexId={}]",
+                            createStorageInfo(),
                             indexMeta.id()
                     ),
                     e
@@ -247,6 +254,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     }
 
     private PageMemorySortedIndexStorage createOrRestoreSortedIndex(IndexMeta indexMeta) {
+        throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
+
         var indexDescriptor = new SortedIndexDescriptor(indexMeta.id(), tableStorage.tablesConfiguration().value());
 
         SortedIndexTree sortedIndexTree = createSortedIndexTree(indexDescriptor, indexMeta);
@@ -300,7 +309,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
         return busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             if (rowId.partitionId() != partitionId) {
                 throw new IllegalArgumentException(
@@ -517,6 +526,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         assert rowId.partitionId() == partitionId : rowId;
 
         return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
             VersionChain currentChain = findVersionChain(rowId);
 
             if (currentChain == null) {
@@ -561,7 +572,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         assert rowId.partitionId() == partitionId : rowId;
 
         return busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             VersionChain currentVersionChain = findVersionChain(rowId);
 
@@ -605,6 +616,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         assert rowId.partitionId() == partitionId : rowId;
 
         busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
             VersionChain currentVersionChain = findVersionChain(rowId);
 
             if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
@@ -657,6 +670,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         assert rowId.partitionId() == partitionId : rowId;
 
         busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
             VersionChain currentChain = findVersionChain(rowId);
 
             if (currentChain != null && currentChain.isUncommitted()) {
@@ -689,7 +704,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
         return busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             return new ScanVersionsCursor(rowId);
         });
@@ -715,7 +730,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
         return busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             Cursor<VersionChain> treeCursor;
 
@@ -736,7 +751,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
         return busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             try (Cursor<VersionChain> cursor = versionChainTree.find(new VersionChainKey(lowerBound), null)) {
                 return cursor.hasNext() ? cursor.next().rowId() : null;
@@ -749,7 +764,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     @Override
     public long rowsCount() {
         return busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             try {
                 return versionChainTree.size();
@@ -775,7 +790,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         @Override
         public final ReadResult next() {
             return busy(() -> {
-                throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+                throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
 
                 if (!hasNext()) {
                     throw new NoSuchElementException("The cursor is exhausted");
@@ -799,7 +814,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         @Override
         public @Nullable TableRow committed(HybridTimestamp timestamp) {
             return busy(() -> {
-                throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+                throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
 
                 if (currentChain == null) {
                     throw new IllegalStateException();
@@ -835,7 +850,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         @Override
         public boolean hasNext() {
             return busy(() -> {
-                throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+                throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
 
                 if (nextRead != null) {
                     return true;
@@ -848,8 +863,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 currentChain = null;
 
                 while (true) {
-                    throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
                     if (!treeCursor.hasNext()) {
                         iterationExhausted = true;
 
@@ -887,6 +900,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         @Override
         public boolean hasNext() {
             return busy(() -> {
+                throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+
                 if (nextRead != null) {
                     return true;
                 }
@@ -963,7 +978,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
 
         private void advanceIfNeeded() {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
 
             if (hasNext != null) {
                 return;
@@ -1006,7 +1021,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
             StorageState state = this.state.get();
 
-            assert state == StorageState.CLOSED : state;
+            assert state == StorageState.CLOSED : IgniteStringFormatter.format("{}, state={}", createStorageInfo(), state);
 
             return;
         }
@@ -1083,7 +1098,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         busyLock.block();
 
         try {
-            IgniteUtils.closeAll(getResourcesToCloseOnRebalance());
+            IgniteUtils.closeAll(getResourcesToCloseOnCleanup());
 
             hashIndexes.values().forEach(PageMemoryHashIndexStorage::startRebalance);
             sortedIndexes.values().forEach(PageMemorySortedIndexStorage::startRebalance);
@@ -1120,12 +1135,45 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     public abstract void lastAppliedOnRebalance(long lastAppliedIndex, long lastAppliedTerm) throws StorageException;
 
     /**
-     * Returns resources that will have to close on rebalancing.
+     * Returns resources that will have to close on cleanup.
      */
-    abstract List<AutoCloseable> getResourcesToCloseOnRebalance();
+    abstract List<AutoCloseable> getResourcesToCloseOnCleanup();
 
     /**
      * Sets the RAFT group configuration on rebalance.
      */
     public abstract void committedGroupConfigurationOnRebalance(RaftGroupConfiguration config);
+
+    /**
+     * Prepares the storage and its indexes for cleanup.
+     *
+     * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+     */
+    public void startCleanup() throws Exception {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            IgniteUtils.closeAll(getResourcesToCloseOnCleanup());
+
+            hashIndexes.values().forEach(PageMemoryHashIndexStorage::startCleanup);
+            sortedIndexes.values().forEach(PageMemorySortedIndexStorage::startCleanup);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Finishes cleanup up the storage and its indexes.
+     */
+    public void finishCleanup() {
+        if (state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE)) {
+            hashIndexes.values().forEach(PageMemoryHashIndexStorage::finishCleanup);
+            sortedIndexes.values().forEach(PageMemorySortedIndexStorage::finishCleanup);
+        }
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 2c3c54d9a5..2ef58c5553 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
 
 import java.util.List;
 import java.util.UUID;
@@ -39,7 +41,6 @@ import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryTableStorage;
 import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineView;
 import org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
@@ -103,6 +104,9 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
 
         DataRegion<PersistentPageMemory> dataRegion = tableStorage.dataRegion();
 
+        this.meta = meta;
+        persistedIndex = meta.lastAppliedIndex();
+
         checkpointManager.addCheckpointListener(checkpointListener = new CheckpointListener() {
             @Override
             public void beforeCheckpointBegin(CheckpointProgress progress, @Nullable Executor exec) throws IgniteInternalCheckedException {
@@ -122,8 +126,6 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
             }
         }, dataRegion);
 
-        this.meta = meta;
-
         blobStorage = new BlobStorage(
                 rowVersionFreeList,
                 dataRegion.pageMemory(),
@@ -136,6 +138,8 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
         return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
             checkpointTimeoutLock.checkpointReadLock();
 
             try {
@@ -149,6 +153,8 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
     @Override
     public CompletableFuture<Void> flush() {
         return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
             CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress();
 
             CheckpointProgress scheduledCheckpoint;
@@ -170,18 +176,26 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
 
     @Override
     public long lastAppliedIndex() {
-        return busy(meta::lastAppliedIndex);
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return meta.lastAppliedIndex();
+        });
     }
 
     @Override
     public long lastAppliedTerm() {
-        return busy(meta::lastAppliedTerm);
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return meta.lastAppliedTerm();
+        });
     }
 
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
         busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             lastAppliedBusy(lastAppliedIndex, lastAppliedTerm);
 
@@ -201,13 +215,19 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
 
     @Override
     public long persistedIndex() {
-        return busy(() -> persistedIndex);
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return persistedIndex;
+        });
     }
 
     @Override
     @Nullable
     public RaftGroupConfiguration committedGroupConfiguration() {
         return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
             try {
                 replicationProtocolGroupConfigReadWriteLock.readLock().lock();
 
@@ -233,7 +253,7 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
     @Override
     public void committedGroupConfiguration(RaftGroupConfiguration config) {
         busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             committedGroupConfigurationBusy(config);
 
@@ -345,23 +365,23 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
     }
 
     /**
-     * Updates the internal data structures of the storage and its indexes on rebalance.
+     * Updates the internal data structures of the storage and its indexes on reblance or cleanup.
      *
      * @param meta Partition meta.
      * @param rowVersionFreeList Free list for {@link RowVersion}.
      * @param indexFreeList Free list fot {@link IndexColumns}.
      * @param versionChainTree Table tree for {@link VersionChain}.
      * @param indexMetaTree Tree that contains SQL indexes' metadata.
-     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     * @throws StorageException If failed.
      */
-    public void updateDataStructuresOnRebalance(
+    public void updateDataStructures(
             PartitionMeta meta,
             RowVersionFreeList rowVersionFreeList,
             IndexColumnsFreeList indexFreeList,
             VersionChainTree versionChainTree,
             IndexMetaTree indexMetaTree
     ) {
-        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+        throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
 
         this.meta = meta;
 
@@ -379,14 +399,14 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
         );
 
         for (PageMemoryHashIndexStorage indexStorage : hashIndexes.values()) {
-            indexStorage.updateDataStructuresOnRebalance(
+            indexStorage.updateDataStructures(
                     indexFreeList,
                     createHashIndexTree(indexStorage.indexDescriptor(), new IndexMeta(indexStorage.indexDescriptor().id(), 0L))
             );
         }
 
         for (PageMemorySortedIndexStorage indexStorage : sortedIndexes.values()) {
-            indexStorage.updateDataStructuresOnRebalance(
+            indexStorage.updateDataStructures(
                     indexFreeList,
                     createSortedIndexTree(indexStorage.indexDescriptor(), new IndexMeta(indexStorage.indexDescriptor().id(), 0L))
             );
@@ -394,7 +414,7 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
     }
 
     @Override
-    List<AutoCloseable> getResourcesToCloseOnRebalance() {
+    List<AutoCloseable> getResourcesToCloseOnCleanup() {
         return List.of(
                 rowVersionFreeList::close,
                 indexFreeList::close,
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 362a3c3529..cc5348e7f9 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.storage.pagememory.mv;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -33,7 +35,6 @@ import org.apache.ignite.internal.pagememory.util.PageIdUtils;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
 import org.apache.ignite.internal.storage.pagememory.index.hash.PageMemoryHashIndexStorage;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
@@ -93,28 +94,44 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
 
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
-        return busy(closure::execute);
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return closure.execute();
+        });
     }
 
     @Override
     public CompletableFuture<Void> flush() {
-        return busy(() -> completedFuture(null));
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return completedFuture(null);
+        });
     }
 
     @Override
     public long lastAppliedIndex() {
-        return busy(() -> lastAppliedIndex);
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return lastAppliedIndex;
+        });
     }
 
     @Override
     public long lastAppliedTerm() {
-        return busy(() -> lastAppliedTerm);
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return lastAppliedTerm;
+        });
     }
 
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
         busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             this.lastAppliedIndex = lastAppliedIndex;
             this.lastAppliedTerm = lastAppliedTerm;
@@ -125,18 +142,26 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
 
     @Override
     public long persistedIndex() {
-        return busy(() -> lastAppliedIndex);
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return lastAppliedIndex;
+        });
     }
 
     @Override
     public @Nullable RaftGroupConfiguration committedGroupConfiguration() {
-        return busy(() -> groupConfig);
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+            return groupConfig;
+        });
     }
 
     @Override
     public void committedGroupConfiguration(RaftGroupConfiguration config) {
         busy(() -> {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+            throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
 
             groupConfig = config;
 
@@ -194,6 +219,10 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
         hashIndexes.values().forEach(indexStorage -> indexStorage.startDestructionOn(destructionExecutor));
         sortedIndexes.values().forEach(indexStorage -> indexStorage.startDestructionOn(destructionExecutor));
 
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+        groupConfig = null;
+
         if (removeIndexDescriptors) {
             hashIndexes.clear();
             sortedIndexes.clear();
@@ -249,35 +278,35 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
     }
 
     @Override
-    List<AutoCloseable> getResourcesToCloseOnRebalance() {
+    List<AutoCloseable> getResourcesToCloseOnCleanup() {
         return List.of(versionChainTree::close, indexMetaTree::close);
     }
 
     /**
-     * Updates the internal data structures of the storage and its indexes on rebalance.
+     * Updates the internal data structures of the storage and its indexes on rebalance or cleanup.
      *
      * @param versionChainTree Table tree for {@link VersionChain}.
      * @param indexMetaTree Tree that contains SQL indexes' metadata.
-     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     * @throws StorageException If failed.
      */
-    public void updateDataStructuresOnRebalance(
+    public void updateDataStructures(
             VersionChainTree versionChainTree,
             IndexMetaTree indexMetaTree
     ) {
-        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+        throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
 
         this.versionChainTree = versionChainTree;
         this.indexMetaTree = indexMetaTree;
 
         for (PageMemoryHashIndexStorage indexStorage : hashIndexes.values()) {
-            indexStorage.updateDataStructuresOnRebalance(
+            indexStorage.updateDataStructures(
                     indexFreeList,
                     createHashIndexTree(indexStorage.indexDescriptor(), new IndexMeta(indexStorage.indexDescriptor().id(), 0L))
             );
         }
 
         for (PageMemorySortedIndexStorage indexStorage : sortedIndexes.values()) {
-            indexStorage.updateDataStructuresOnRebalance(
+            indexStorage.updateDataStructures(
                     indexFreeList,
                     createSortedIndexTree(indexStorage.indexDescriptor(), new IndexMeta(indexStorage.indexDescriptor().id(), 0L))
             );
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4c408fc3e2..4507794523 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -323,7 +323,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
             try {
-                saveLastApplied(requireWriteBatch(), lastAppliedIndex, lastAppliedTerm);
+                savePendingLastApplied(requireWriteBatch(), lastAppliedIndex, lastAppliedTerm);
 
                 return null;
             } catch (RocksDBException e) {
@@ -332,7 +332,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         });
     }
 
-    private void saveLastApplied(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+    private void savePendingLastApplied(
+            AbstractWriteBatch writeBatch,
+            long lastAppliedIndex,
+            long lastAppliedTerm
+    ) throws RocksDBException {
         writeBatch.put(meta, lastAppliedIndexKey, longToBytes(lastAppliedIndex));
         writeBatch.put(meta, lastAppliedTermKey, longToBytes(lastAppliedTerm));
 
@@ -1477,7 +1481,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         busyLock.block();
 
         try {
-            clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+            clearStorage(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
         } catch (RocksDBException e) {
             throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
         } finally {
@@ -1496,7 +1500,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         }
 
         try {
-            clearStorageOnRebalance(writeBatch, 0, 0);
+            clearStorage(writeBatch, 0, 0);
         } catch (RocksDBException e) {
             throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
         }
@@ -1513,7 +1517,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         }
 
         try {
-            saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+            saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
 
             saveRaftGroupConfigurationOnRebalance(writeBatch, raftGroupConfig);
         } catch (RocksDBException e) {
@@ -1521,16 +1525,19 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         }
     }
 
-    private void clearStorageOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
-        saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+    private void clearStorage(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        pendingGroupConfig = null;
+        lastGroupConfig = null;
 
         writeBatch.delete(meta, lastGroupConfigKey);
         writeBatch.delete(meta, partitionIdKey(partitionId));
         writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
     }
 
-    private void saveLastAppliedOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
-        saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+    private void saveLastApplied(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        savePendingLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
 
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
@@ -1543,4 +1550,29 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         this.lastGroupConfig = config;
     }
+
+    /**
+     * Prepares the storage for cleanup.
+     *
+     * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+     */
+    void startCleanup(WriteBatch writeBatch) throws RocksDBException {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        clearStorage(writeBatch, 0, 0);
+    }
+
+    /**
+     * Finishes cleanup up the storage.
+     */
+    void finishCleanup() {
+        if (state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE)) {
+            busyLock.unblock();
+        }
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 2256c930b4..f6ebf8c7e5 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -670,7 +670,10 @@ public class RocksDbTableStorage implements MvTableStorage {
                 return completedFuture(null);
             } catch (RocksDBException e) {
                 throw new StorageRebalanceException(
-                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        IgniteStringFormatter.format(
+                                "Error when trying to start rebalancing storage: [{}]",
+                                mvPartitionStorage.createStorageInfo()
+                        ),
                         e
                 );
             }
@@ -703,7 +706,10 @@ public class RocksDbTableStorage implements MvTableStorage {
                 return completedFuture(null);
             } catch (RocksDBException e) {
                 throw new StorageRebalanceException(
-                        "Error when trying to abort rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        IgniteStringFormatter.format(
+                                "Error when trying to abort rebalancing storage: [{}]",
+                                mvPartitionStorage.createStorageInfo()
+                        ),
                         e
                 );
             }
@@ -741,13 +747,59 @@ public class RocksDbTableStorage implements MvTableStorage {
                 return completedFuture(null);
             } catch (RocksDBException e) {
                 throw new StorageRebalanceException(
-                        "Error when trying to finish rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        IgniteStringFormatter.format(
+                                "Error when trying to finish rebalancing storage: [{}]",
+                                mvPartitionStorage.createStorageInfo()
+                        ),
                         e
                 );
             }
         });
     }
 
+    @Override
+    public CompletableFuture<Void> clearPartition(int partitionId) {
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            List<RocksDbHashIndexStorage> hashIndexStorages = getHashIndexStorages(partitionId);
+            List<RocksDbSortedIndexStorage> sortedIndexStorages = getSortedIndexStorages(partitionId);
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startCleanup(writeBatch);
+
+                for (RocksDbHashIndexStorage hashIndexStorage : hashIndexStorages) {
+                    hashIndexStorage.startCleanup(writeBatch);
+                }
+
+                for (RocksDbSortedIndexStorage sortedIndexStorage : sortedIndexStorages) {
+                    sortedIndexStorage.startCleanup(writeBatch);
+                }
+
+                db.write(writeOptions, writeBatch);
+            } catch (RocksDBException e) {
+                throw new StorageException(
+                        IgniteStringFormatter.format(
+                                "Error when trying to cleanup storage: [{}]",
+                                mvPartitionStorage.createStorageInfo()
+                        ),
+                        e
+                );
+            } finally {
+                mvPartitionStorage.finishCleanup();
+
+                hashIndexStorages.forEach(RocksDbHashIndexStorage::finishCleanup);
+                sortedIndexStorages.forEach(RocksDbSortedIndexStorage::finishCleanup);
+            }
+
+            return completedFuture(null);
+        });
+    }
+
     /**
      * Returns table name.
      */
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index 4e60f1e259..255f96ae02 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -344,4 +344,29 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
             throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
         }
     }
+
+    /**
+     * Prepares the storage for cleanup.
+     *
+     * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+     */
+    public void startCleanup(WriteBatch writeBatch) throws RocksDBException {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        destroyData(writeBatch);
+    }
+
+    /**
+     * Finishes cleanup up the storage.
+     */
+    public void finishCleanup() {
+        if (state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE)) {
+            busyLock.unblock();
+        }
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index c22583f76a..bb7e923122 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -469,4 +469,29 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
             throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
         }
     }
+
+    /**
+     * Prepares the storage  for cleanup.
+     *
+     * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+     */
+    public void startCleanup(WriteBatch writeBatch) throws RocksDBException {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        destroyData(writeBatch);
+    }
+
+    /**
+     * Finishes cleanup up the storage.
+     */
+    public void finishCleanup() {
+        if (state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE)) {
+            busyLock.unblock();
+        }
+    }
 }
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 581130a468..7bb22da652 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -132,11 +132,4 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
     public void testDestroyTableStorage() throws Exception {
         super.testDestroyTableStorage();
     }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
-    @Override
-    public void testRestartStoragesAfterFailDuringRebalance() {
-        super.testRestartStoragesAfterFailDuringRebalance();
-    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 7a379d8b3c..c2296a8e2a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -752,7 +752,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                             return partitionDataStorageFut
                                     .thenCompose(s -> storageUpdateHandlerFut)
-                                    .thenCompose(s -> getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId))
+                                    .thenCompose(s -> getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId))
                                     .thenAcceptAsync(txStatePartitionStorage -> {
                                         RaftGroupOptions groupOptions = groupOptionsForPartition(
                                                 internalTbl.storage(),
@@ -818,7 +818,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             }
 
                             CompletableFuture<TxStateStorage> txStateStorageFuture =
-                                    getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId);
+                                    getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId);
 
                             StorageUpdateHandler storageUpdateHandler = storageUpdateHandlerFut.join();
 
@@ -1847,7 +1847,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         StorageUpdateHandler storageUpdateHandler =
                                 new StorageUpdateHandler(partId, partitionDataStorage, tbl.indexStorageAdapters(partId));
 
-                        TxStateStorage txStatePartitionStorage = getOrCreateTxStateStorage(internalTable.txStateStorage(), partId);
+                        TxStateStorage txStatePartitionStorage = getOrCreateTxStateStorage(internalTable.txStateStorage(), partId).join();
 
                         RaftGroupOptions groupOptions = groupOptionsForPartition(
                                 internalTable.storage(),
@@ -2057,8 +2057,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @return Future that will complete when the operation completes.
      */
     private CompletableFuture<MvPartitionStorage> getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
-        // TODO: IGNITE-18603 Clear if TxStateStorage hasn't been rebalanced yet
-        return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor);
+        // TODO: IGNITE-18633 Should clean both MvPartitionStorage and TxStateStorage if the rebalance for one of them has not ended
+        // TODO: IGNITE-18633 Also think about waiting for index stores for a partition, see PartitionAccessImpl.startRebalance
+        return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
+                .thenCompose(mvPartitionStorage -> {
+                    if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS) {
+                        return mvTableStorage.clearPartition(partitionId).thenApply(unused -> mvPartitionStorage);
+                    } else {
+                        return completedFuture(mvPartitionStorage);
+                    }
+                });
     }
 
     /**
@@ -2068,18 +2076,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * in when the rebalance was interrupted.
      *
      * @param txStateTableStorage Transaction state storage for a table.
-     * @param partId Partition ID.
-     */
-    private static TxStateStorage getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partId) {
-        // TODO: IGNITE-18603 Clear if MvPartitionStorage hasn't been rebalanced yet
-        return txStateTableStorage.getOrCreateTxStateStorage(partId);
-    }
-
-    /**
-     * Async version of {@link #getOrCreateTxStateStorage}.
+     * @param partitionId Partition ID.
+     * @return Future that will complete when the operation completes.
      */
-    private CompletableFuture<TxStateStorage> getOrCreateTxStateStorageAsync(TxStateTableStorage txStateTableStorage, int partId) {
-        return CompletableFuture.supplyAsync(() -> getOrCreateTxStateStorage(txStateTableStorage, partId), ioExecutor);
+    private CompletableFuture<TxStateStorage> getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partitionId) {
+        // TODO: IGNITE-18633 Should clean both MvPartitionStorage and TxStateStorage if the rebalance for one of them has not ended
+        return CompletableFuture.supplyAsync(() -> txStateTableStorage.getOrCreateTxStateStorage(partitionId), ioExecutor)
+                .thenCompose(txStateStorage -> {
+                    if (txStateStorage.persistedIndex() == TxStateStorage.REBALANCE_IN_PROGRESS) {
+                        return txStateStorage.clear().thenApply(unused -> txStateStorage);
+                    } else {
+                        return completedFuture(txStateStorage);
+                    }
+                });
     }
 
     private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index 7938cd43b4..1918108a0d 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -194,4 +194,22 @@ public interface TxStateStorage extends ManuallyCloseable {
      *      has failed.
      */
     CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm);
+
+    /**
+     * Clears transaction state storage. After the cleaning is completed, the storage will be fully available.
+     * <ul>
+     *     <li>Cancels all current operations (including cursors) with storage and waits for their completion;</li>
+     *     <li>Does not allow operations to be performed (exceptions will be thrown) with the storage until the cleaning is completed;</li>
+     *     <li>Clears storage;</li>
+     *     <li>Sets the {@link #lastAppliedIndex()}, {@link #lastAppliedTerm()} and {@link #persistedIndex()} to {@code 0};</li>
+     *     <li>Once the storage cleanup is complete (success or error), allows to perform all storage operations.</li>
+     * </ul>
+     *
+     * @return Future of transaction state storage cleanup.
+     * @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_STOPPED_ERR} if the storage is closed or destroyed.
+     * @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_REBALANCE_ERR} if the storage is in process of rebalance.
+     * @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_ERR} if the storage is in progress of cleanup or failed for
+     *      another reason.
+     */
+    CompletableFuture<Void> clear();
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 5aa7ddfa37..1fff2ab9cd 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -133,27 +133,10 @@ public class TxStateRocksDbStorage implements TxStateStorage {
             byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
 
             if (indexAndTermBytes != null) {
-                long lastAppliedIndex = bytesToLong(indexAndTermBytes);
-
-                if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
-                    try (WriteBatch writeBatch = new WriteBatch()) {
-                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
-                        writeBatch.delete(lastAppliedIndexAndTermKey);
-
-                        db.write(writeOptions, writeBatch);
-                    } catch (Exception e) {
-                        throw new IgniteInternalException(
-                                TX_STATE_STORAGE_REBALANCE_ERR,
-                                IgniteStringFormatter.format("Failed to clear storage:", createStorageInfo()),
-                                e
-                        );
-                    }
-                } else {
-                    this.lastAppliedIndex = lastAppliedIndex;
-                    persistedIndex = lastAppliedIndex;
+                lastAppliedIndex = bytesToLong(indexAndTermBytes);
+                lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
 
-                    lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
-                }
+                persistedIndex = lastAppliedIndex;
             }
 
             return null;
@@ -231,10 +214,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
                 // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
                 // have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long).
                 if (state.get() != StorageState.REBALANCE) {
-                    writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
-
-                    lastAppliedIndex = commandIndex;
-                    lastAppliedTerm = commandTerm;
+                    updateLastApplied(writeBatch, commandIndex, commandTerm);
                 }
 
                 db.write(writeOptions, writeBatch);
@@ -428,7 +408,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         }
 
         try (WriteBatch writeBatch = new WriteBatch()) {
-            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            clearStorageData(writeBatch);
 
             writeBatch.delete(lastAppliedIndexAndTermKey);
 
@@ -483,14 +463,11 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         busyLock.block();
 
         try (WriteBatch writeBatch = new WriteBatch()) {
-            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
-            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+            clearStorageData(writeBatch);
 
-            db.write(writeOptions, writeBatch);
+            updateLastAppliedAndPersistedIndex(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
 
-            lastAppliedIndex = REBALANCE_IN_PROGRESS;
-            lastAppliedTerm = REBALANCE_IN_PROGRESS;
-            persistedIndex = REBALANCE_IN_PROGRESS;
+            db.write(writeOptions, writeBatch);
 
             return completedFuture(null);
         } catch (Exception e) {
@@ -511,7 +488,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         }
 
         try (WriteBatch writeBatch = new WriteBatch()) {
-            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            clearStorageData(writeBatch);
+
             writeBatch.delete(lastAppliedIndexAndTermKey);
 
             db.write(writeOptions, writeBatch);
@@ -542,14 +520,10 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         }
 
         try (WriteBatch writeBatch = new WriteBatch()) {
-            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
+            updateLastAppliedAndPersistedIndex(writeBatch, lastAppliedIndex, lastAppliedTerm);
 
             db.write(writeOptions, writeBatch);
 
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-            this.persistedIndex = lastAppliedIndex;
-
             state.set(StorageState.RUNNABLE);
         } catch (Exception e) {
             throw new IgniteInternalException(
@@ -562,6 +536,57 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         return completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Void> clear() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+            throwExceptionDependingOnStorageState();
+        }
+
+        // We changed the status and wait for all current operations (together with cursors) with the storage to be completed.
+        busyLock.block();
+
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            clearStorageData(writeBatch);
+
+            updateLastAppliedAndPersistedIndex(writeBatch, 0, 0);
+
+            db.write(writeOptions, writeBatch);
+
+            return completedFuture(null);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_ERR,
+                    IgniteStringFormatter.format("Failed to cleanup storage: [{}]", createStorageInfo()),
+                    e
+            );
+        } finally {
+            state.set(StorageState.RUNNABLE);
+
+            busyLock.unblock();
+        }
+    }
+
+    private void clearStorageData(WriteBatch writeBatch) throws RocksDBException {
+        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+    }
+
+    private void updateLastApplied(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
+
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+    private void updateLastAppliedAndPersistedIndex(
+            WriteBatch writeBatch,
+            long lastAppliedIndex,
+            long lastAppliedTerm
+    ) throws RocksDBException {
+        updateLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        persistedIndex = lastAppliedIndex;
+    }
+
     /**
      * Tries to close the storage with resources if it hasn't already been closed.
      *
@@ -609,6 +634,11 @@ public class TxStateRocksDbStorage implements TxStateStorage {
                 );
             case REBALANCE:
                 throw createStorageInProgressOfRebalanceException();
+            case CLEANUP:
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Storage is in the process of cleanup: [{}]", createStorageInfo())
+                );
             default:
                 throw new IgniteInternalException(
                         TX_STATE_STORAGE_ERR,
@@ -644,6 +674,9 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         CLOSED,
 
         /** Storage is in the process of being rebalanced. */
-        REBALANCE
+        REBALANCE,
+
+        /** Storage is in the process of cleanup. */
+        CLEANUP
     }
 }
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
index 1b62eea05d..b37cf34ccb 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
@@ -17,17 +17,15 @@
 
 package org.apache.ignite.internal.tx.storage.state.rocksdb;
 
-import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.tx.storage.state.TxStateStorage.REBALANCE_IN_PROGRESS;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
 
 import java.nio.file.Path;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -36,7 +34,6 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.storage.state.AbstractTxStateStorageTest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
-import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -64,19 +61,21 @@ public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
     }
 
     @Test
-    void testRestartStorageInProgressOfRebalance() throws Exception {
+    void testRestartStorageInProgressOfRebalance() {
         TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
 
-        fillStorage(
-                storage,
-                List.of(randomTxMetaTuple(1, UUID.randomUUID()), randomTxMetaTuple(1, UUID.randomUUID()))
+        List<IgniteBiTuple<UUID, TxMeta>> rows = List.of(
+                randomTxMetaTuple(1, UUID.randomUUID()),
+                randomTxMetaTuple(1, UUID.randomUUID())
         );
 
-        storage.flush().get(10, TimeUnit.SECONDS);
+        fillStorage(storage, rows);
 
         // We emulate the situation that the rebalancing did not have time to end.
         storage.lastApplied(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
 
+        assertThat(storage.flush(), willCompleteSuccessfully());
+
         tableStorage.stop();
 
         tableStorage = createTableStorage();
@@ -85,10 +84,8 @@ public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
 
         storage = tableStorage.getOrCreateTxStateStorage(0);
 
-        checkLastApplied(storage, 0, 0, 0);
+        checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
 
-        try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
-            assertThat(scan.stream().collect(toList()), empty());
-        }
+        checkStorageContainsRows(storage, rows);
     }
 }
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index 3580d589e4..b748acbd7c 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.tx.storage.state;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.tx.storage.state.TxStateStorage.REBALANCE_IN_PROGRESS;
@@ -317,20 +316,15 @@ public abstract class AbstractTxStateStorageTest {
         // Let's check the storage.
         checkLastApplied(storage, 30, 30, 50);
 
-        try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
-            assertThat(
-                    scan.stream().collect(toList()),
-                    containsInAnyOrder(rowsOnRebalance.toArray(new IgniteBiTuple[0]))
-            );
-        }
+        checkStorageContainsRows(storage, rowsOnRebalance);
     }
 
     @Test
-    public void testFailRebalance() throws Exception {
+    public void testFailRebalance() {
         TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
 
         // Nothing will happen because rebalance has not started.
-        storage.abortRebalance().get(1, SECONDS);
+        assertThat(storage.abortRebalance(), willCompleteSuccessfully());
 
         List<IgniteBiTuple<UUID, TxMeta>> rowsBeforeStartRebalance = List.of(
                 randomTxMetaTuple(1, UUID.randomUUID()),
@@ -355,15 +349,13 @@ public abstract class AbstractTxStateStorageTest {
         // Let's check the storage.
         checkLastApplied(storage, 0, 0, 0);
 
-        try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
-            assertThat(scan.stream().collect(toList()), is(empty()));
-        }
+        checkStorageIsEmpty(storage);
     }
 
     @Test
     public void testStartRebalanceForClosedOrDestroedPartition() {
         TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0);
-        TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(0);
+        TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(1);
 
         storage0.close();
         storage1.destroy();
@@ -372,6 +364,64 @@ public abstract class AbstractTxStateStorageTest {
         assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage1::startRebalance);
     }
 
+    @Test
+    void testClear() {
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
+
+        // Cleaning up on empty storage should not generate errors.
+        assertThat(storage.clear(), willCompleteSuccessfully());
+
+        checkLastApplied(storage, 0, 0, 0);
+        checkStorageIsEmpty(storage);
+
+        List<IgniteBiTuple<UUID, TxMeta>> rows = List.of(
+                randomTxMetaTuple(1, UUID.randomUUID()),
+                randomTxMetaTuple(1, UUID.randomUUID())
+        );
+
+        fillStorage(storage, rows);
+
+        // Cleanup the non-empty storage.
+        assertThat(storage.clear(), willCompleteSuccessfully());
+
+        checkLastApplied(storage, 0, 0, 0);
+        checkStorageIsEmpty(storage);
+    }
+
+    @Test
+    void testCleanOnClosedDestroyedAndRebalancedStorages() {
+        TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0);
+        TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(1);
+        TxStateStorage storage2 = tableStorage.getOrCreateTxStateStorage(2);
+
+        storage0.close();
+        storage1.destroy();
+        assertThat(storage2.startRebalance(), willCompleteSuccessfully());
+
+        try {
+            assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage0::clear);
+            assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage1::clear);
+            assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, storage2::clear);
+        } finally {
+            assertThat(storage2.abortRebalance(), willCompleteSuccessfully());
+        }
+    }
+
+    private static void checkStorageIsEmpty(TxStateStorage storage) {
+        try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
+            assertThat(scan.stream().collect(toList()), is(empty()));
+        }
+    }
+
+    protected static void checkStorageContainsRows(TxStateStorage storage, List<IgniteBiTuple<UUID, TxMeta>> expRows) {
+        try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
+            assertThat(
+                    scan.stream().collect(toList()),
+                    containsInAnyOrder(expRows.toArray(new IgniteBiTuple[0]))
+            );
+        }
+    }
+
     private void startRebalanceWithChecks(TxStateStorage storage, List<IgniteBiTuple<UUID, TxMeta>> rows) {
         fillStorage(storage, rows);
 
@@ -446,8 +496,7 @@ public abstract class AbstractTxStateStorageTest {
         for (int i = 0; i < rows.size(); i++) {
             IgniteBiTuple<UUID, TxMeta> row = rows.get(i);
 
-            // If even.
-            if ((i & 1) == 0) {
+            if ((i % 2) == 0) {
                 assertTrue(storage.compareAndSet(row.get1(), null, row.get2(), i * 10L, i * 10L));
             } else {
                 storage.put(row.get1(), row.get2());
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
index ebbf24309f..a8eee03a35 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
@@ -231,6 +231,22 @@ public class TestTxStateStorage implements TxStateStorage {
                 .whenComplete((unused, throwable) -> lastApplied(lastAppliedIndex, lastAppliedTerm));
     }
 
+    @Override
+    public CompletableFuture<Void> clear() {
+        checkStorageClosed();
+
+        if (rebalanceFutureReference.get() != null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "In the process of rebalancing");
+        }
+
+        storage.clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+
+        return completedFuture(null);
+    }
+
     private void checkStorageInProgreesOfRebalance() {
         if (rebalanceFutureReference.get() != null) {
             throwRebalanceInProgressException();