You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/19 13:18:36 UTC

[ignite-3] branch main updated: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver (#1530)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2cb222a716 IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver (#1530)
2cb222a716 is described below

commit 2cb222a71685c500558844084f339a4af71d093a
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Jan 19 16:18:29 2023 +0300

    IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver (#1530)
---
 .../internal/storage/impl/TestMvTableStorage.java  |  15 +-
 .../ignite/internal/storage/rocksdb/HashIndex.java |  10 ++
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 171 ++++++++++++++++++---
 .../storage/rocksdb/RocksDbTableStorage.java       | 154 ++++++++++++++++---
 .../internal/storage/rocksdb/SortedIndex.java      |  10 ++
 .../rocksdb/index/RocksDbHashIndexStorage.java     | 110 +++++++++++--
 .../rocksdb/index/RocksDbSortedIndexStorage.java   |  87 ++++++++++-
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |  21 ---
 8 files changed, 482 insertions(+), 96 deletions(-)

diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 45e59cfa97..dac39a6201 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
 
 import java.util.Map;
 import java.util.Objects;
@@ -163,7 +164,7 @@ public class TestMvTableStorage implements MvTableStorage {
     @Override
     public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
         if (!partitions.containsKey(partitionId)) {
-            throw new StorageException(createPartitionDoesNotExistsErrorMessage(partitionId));
+            throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
         }
 
         SortedIndices sortedIndices = sortedIndicesById.computeIfAbsent(
@@ -177,7 +178,7 @@ public class TestMvTableStorage implements MvTableStorage {
     @Override
     public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) {
         if (!partitions.containsKey(partitionId)) {
-            throw new StorageException(createPartitionDoesNotExistsErrorMessage(partitionId));
+            throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
         }
 
         HashIndices sortedIndices = hashIndicesById.computeIfAbsent(
@@ -249,7 +250,7 @@ public class TestMvTableStorage implements MvTableStorage {
         TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
 
         if (partitionStorage == null) {
-            throw new StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+            throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
         }
 
         assert !destroyFutureByPartitionId.containsKey(partitionId) : partitionId;
@@ -292,7 +293,7 @@ public class TestMvTableStorage implements MvTableStorage {
         TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
 
         if (partitionStorage == null) {
-            throw new StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+            throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
         }
 
         return rebalanceFuture
@@ -318,7 +319,7 @@ public class TestMvTableStorage implements MvTableStorage {
         TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
 
         if (partitionStorage == null) {
-            throw new StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+            throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
         }
 
         return rebalanceFuture
@@ -344,10 +345,6 @@ public class TestMvTableStorage implements MvTableStorage {
         }
     }
 
-    private static String createPartitionDoesNotExistsErrorMessage(int partitionId) {
-        return "Partition ID " + partitionId + " does not exist";
-    }
-
     private Stream<TestHashIndexStorage> testHashIndexStorageStream(Integer partitionId) {
         return hashIndicesById.values().stream()
                 .map(hashIndices -> hashIndices.storageByPartitionId.get(partitionId))
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index f055bc6bcc..1d15af522f 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
+import org.jetbrains.annotations.Nullable;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 
@@ -73,4 +74,13 @@ class HashIndex {
             hashIndex.destroyData(writeBatch);
         }
     }
+
+    /**
+     * Returns hash index storage for partition.
+     *
+     * @param partitionId Partition ID.
+     */
+    @Nullable RocksDbHashIndexStorage get(int partitionId) {
+        return storages.get(partitionId);
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 2dbc8e4991..ccd6a421c5 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -22,9 +22,14 @@ import static java.nio.ByteBuffer.allocateDirect;
 import static java.util.Arrays.copyOf;
 import static java.util.Arrays.copyOfRange;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.partitionIdKey;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToUuid;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.putUuidToBytes;
 import static org.rocksdb.ReadTier.PERSISTED_TIER;
 
@@ -35,10 +40,10 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -47,14 +52,17 @@ import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.storage.util.StorageState;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
+import org.rocksdb.AbstractWriteBatch;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
@@ -210,11 +218,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
     private volatile long persistedIndex;
 
-    /** Busy lock to stop synchronously. */
+    /** Busy lock. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /** Prevents double stopping the component. */
-    private final AtomicBoolean stopGuard = new AtomicBoolean();
+    /** Current state of the storage. */
+    private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
 
     /**
      * Constructor.
@@ -245,7 +253,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         persistedIndex = lastAppliedIndex;
     }
 
-    /** {@inheritDoc} */
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
         if (threadLocalWriteBatch.get() != null) {
@@ -314,14 +321,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
         busy(() -> {
-            WriteBatchWithIndex writeBatch = requireWriteBatch();
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
             try {
-                writeBatch.put(meta, lastAppliedIndexKey, ByteUtils.longToBytes(lastAppliedIndex));
-                writeBatch.put(meta, lastAppliedTermKey, ByteUtils.longToBytes(lastAppliedTerm));
-
-                pendingAppliedIndex = lastAppliedIndex;
-                pendingAppliedTerm = lastAppliedTerm;
+                saveLastApplied(requireWriteBatch(), lastAppliedIndex, lastAppliedTerm);
 
                 return null;
             } catch (RocksDBException e) {
@@ -330,6 +333,14 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         });
     }
 
+    private void saveLastApplied(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        writeBatch.put(meta, lastAppliedIndexKey, longToBytes(lastAppliedIndex));
+        writeBatch.put(meta, lastAppliedTermKey, longToBytes(lastAppliedTerm));
+
+        pendingAppliedIndex = lastAppliedIndex;
+        pendingAppliedTerm = lastAppliedTerm;
+    }
+
     @Override
     public long persistedIndex() {
         return busy(() -> persistedIndex);
@@ -344,6 +355,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public void committedGroupConfiguration(RaftGroupConfiguration config) {
         busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             WriteBatchWithIndex writeBatch = requireWriteBatch();
 
             try {
@@ -512,6 +525,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             WriteBatchWithIndex writeBatch = requireWriteBatch();
 
             ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
@@ -593,6 +608,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             if (rowId.partitionId() != partitionId) {
                 throw new IllegalArgumentException(
                         String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
@@ -810,6 +827,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
             byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
@@ -830,7 +849,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             it.seek(lowerBound);
 
-            return new BusyRocksIteratorAdapter<ReadResult>(busyLock, it) {
+            return new RocksIteratorAdapter<ReadResult>(it) {
                 @Override
                 protected ReadResult decodeEntry(byte[] key, byte[] value) {
                     int keyLength = key.length;
@@ -841,8 +860,21 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 }
 
                 @Override
-                protected void handleBusyFail() {
-                    throw new StorageClosedException();
+                public boolean hasNext() {
+                    return busy(() -> {
+                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
+                        return super.hasNext();
+                    });
+                }
+
+                @Override
+                public ReadResult next() {
+                    return busy(() -> {
+                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
+                        return super.next();
+                    });
                 }
 
                 @Override
@@ -861,6 +893,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         Objects.requireNonNull(timestamp, "timestamp is null");
 
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             if (lookingForLatestVersions(timestamp)) {
                 return new ScanLatestVersionsCursor();
             } else {
@@ -883,6 +917,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             ByteBuffer keyBuf = prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
 
             try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
@@ -940,6 +976,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public long rowsCount() {
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             try (
                     var upperBound = new Slice(partitionEndPrefix());
                     var options = new ReadOptions().setIterateUpperBound(upperBound);
@@ -967,15 +1005,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         writeBatch.delete(meta, lastAppliedTermKey);
         writeBatch.delete(meta, lastGroupConfigKey);
 
-        writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
+        writeBatch.delete(meta, partitionIdKey(partitionId));
 
         writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
-        if (!stopGuard.compareAndSet(false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
@@ -1176,12 +1217,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         @Override
         public boolean hasNext() {
-            return busy(this::hasNextBusy);
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
+                return hasNextBusy();
+            });
         }
 
         @Override
         public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
             return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
                 Objects.requireNonNull(timestamp, "timestamp is null");
 
                 if (currentRowId == null) {
@@ -1205,6 +1252,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         @Override
         public final ReadResult next() {
             return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
                 if (!hasNextBusy()) {
                     throw new NoSuchElementException();
                 }
@@ -1396,7 +1445,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     private <V> V busy(Supplier<V> supplier) {
         if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
         }
 
         try {
@@ -1405,4 +1454,84 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Aborts storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+     */
+    void abortReblance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            clearStorageOnRebalance(writeBatch, 0, 0);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    /**
+     * Completes storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+     */
+    void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    private void clearStorageOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        writeBatch.delete(meta, lastGroupConfigKey);
+        writeBatch.delete(meta, partitionIdKey(partitionId));
+        writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
+    }
+
+    private void saveLastAppliedOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+
+        persistedIndex = lastAppliedIndex;
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index dfbf279ebf..811a30dcc0 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexId;
+import static org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.io.IOException;
@@ -34,6 +35,8 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -49,6 +52,7 @@ import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -57,9 +61,10 @@ import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.ColumnFamilyType;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbBinaryTupleComparator;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -127,7 +132,9 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** Prevents double stopping of the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    private final ConcurrentMap<Integer, CompletableFuture<Void>> partitionIdDestroyFutureMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, CompletableFuture<Void>> destroyFutureByPartitionId = new ConcurrentHashMap<>();
+
+    private final Set<Integer> rebalancePartitions = ConcurrentHashMap.newKeySet();
 
     /**
      * Constructor.
@@ -252,7 +259,7 @@ public class RocksDbTableStorage implements MvTableStorage {
 
                         default:
                             throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
-                                    + tableCfg.value().name() + ']');
+                                    + getTableName() + ']');
                     }
                 }
 
@@ -407,9 +414,12 @@ public class RocksDbTableStorage implements MvTableStorage {
         return inBusyLock(busyLock, () -> {
             checkPartitionId(partitionId);
 
+            assert !rebalancePartitions.contains(partitionId)
+                    : IgniteStringFormatter.format("table={}, partitionId={}", getTableName(), partitionId);
+
             CompletableFuture<Void> destroyPartitionFuture = new CompletableFuture<>();
 
-            CompletableFuture<Void> previousDestroyPartitionFuture = partitionIdDestroyFutureMap.putIfAbsent(
+            CompletableFuture<Void> previousDestroyPartitionFuture = destroyFutureByPartitionId.putIfAbsent(
                     partitionId,
                     destroyPartitionFuture
             );
@@ -442,16 +452,16 @@ public class RocksDbTableStorage implements MvTableStorage {
 
                     flushFuture.whenComplete((unused, throwable) -> {
                         if (throwable == null) {
-                            partitionIdDestroyFutureMap.remove(partitionId).complete(null);
+                            destroyFutureByPartitionId.remove(partitionId).complete(null);
                         } else {
-                            partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+                            destroyFutureByPartitionId.remove(partitionId).completeExceptionally(throwable);
                         }
                     });
                 } catch (Throwable throwable) {
-                    partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+                    destroyFutureByPartitionId.remove(partitionId).completeExceptionally(throwable);
                 }
             } else {
-                partitionIdDestroyFutureMap.remove(partitionId).complete(null);
+                destroyFutureByPartitionId.remove(partitionId).complete(null);
             }
 
             return destroyPartitionFuture;
@@ -466,7 +476,7 @@ public class RocksDbTableStorage implements MvTableStorage {
             RocksDbMvPartitionStorage partitionStorage = getMvPartitionBusy(partitionId);
 
             if (partitionStorage == null) {
-                throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
+                throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
             }
 
             return storages.getOrCreateStorage(partitionStorage);
@@ -545,15 +555,15 @@ public class RocksDbTableStorage implements MvTableStorage {
     /**
      * Checks that a passed partition id is within the proper bounds.
      *
-     * @param partId Partition id.
+     * @param partitionId Partition id.
      */
-    private void checkPartitionId(int partId) {
-        if (partId < 0 || partId >= partitions.length()) {
-            throw new IllegalArgumentException(S.toString(
-                    "Unable to access partition with id outside of configured range",
-                    "table", tableCfg.value().name(), false,
-                    "partitionId", partId, false,
-                    "partitions", partitions.length(), false
+    private void checkPartitionId(int partitionId) {
+        if (partitionId < 0 || partitionId >= partitions.length()) {
+            throw new IllegalArgumentException(IgniteStringFormatter.format(
+                    "Unable to access partition with id outside of configured range [table={}, partitionId={}, partitions={}]",
+                    getTableName(),
+                    partitionId,
+                    partitions.length()
             ));
         }
     }
@@ -618,7 +628,7 @@ public class RocksDbTableStorage implements MvTableStorage {
                 return sortedIndexCfDescriptor(cfName, indexDescriptor);
 
             default:
-                throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.value().name() + ']');
+                throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + getTableName() + ']');
         }
     }
 
@@ -635,19 +645,115 @@ public class RocksDbTableStorage implements MvTableStorage {
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                boolean added = rebalancePartitions.add(partitionId);
+
+                assert added : mvPartitionStorage.createStorageInfo();
+
+                return completedFuture(null);
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            boolean removed = rebalancePartitions.remove(partitionId);
+
+            if (!removed) {
+                return completedFuture(null);
+            }
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.abortReblance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                return completedFuture(null);
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to abort rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            boolean removed = rebalancePartitions.remove(partitionId);
+
+            if (!removed) {
+                throw new StorageRebalanceException("Rebalance for partition did not start: " + mvPartitionStorage.createStorageInfo());
+            }
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.finishRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+                getHashIndexStorages(partitionId).forEach(RocksDbHashIndexStorage::finishRebalance);
+                getSortedIndexStorages(partitionId).forEach(RocksDbSortedIndexStorage::finishRebalance);
+
+                db.write(writeOptions, writeBatch);
+
+                return completedFuture(null);
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to finish rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
+    }
+
+    /**
+     * Returns table name.
+     */
+    String getTableName() {
+        return tableCfg.name().value();
+    }
+
+    private List<RocksDbHashIndexStorage> getHashIndexStorages(int partitionId) {
+        return hashIndices.values().stream().map(indexes -> indexes.get(partitionId)).filter(Objects::nonNull).collect(toList());
+    }
+
+    private List<RocksDbSortedIndexStorage> getSortedIndexStorages(int partitionId) {
+        return sortedIndices.values().stream().map(indexes -> indexes.get(partitionId)).filter(Objects::nonNull).collect(toList());
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index a6dfd44671..9c7f425119 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.jetbrains.annotations.Nullable;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 
@@ -91,4 +92,13 @@ class SortedIndex implements ManuallyCloseable {
     public void close() {
         indexCf.handle().close();
     }
+
+    /**
+     * Returns sorted index storage for partition.
+     *
+     * @param partitionId Partition ID.
+     */
+    @Nullable RocksDbSortedIndexStorage get(int partitionId) {
+        return storages.get(partitionId);
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index cb43a95e52..c9c80e02af 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -18,28 +18,33 @@
 package org.apache.ignite.internal.storage.rocksdb.index;
 
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementArray;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
-import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.storage.util.StorageState;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.HashUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
@@ -79,11 +84,11 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
      */
     private final byte[] constantPrefix;
 
-    /** Busy lock to stop synchronously. */
+    /** Busy lock. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /** Prevents double stopping the component. */
-    private final AtomicBoolean stopGuard = new AtomicBoolean();
+    /** Current state of the storage. */
+    private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
 
     /**
      * Creates a new Hash Index storage.
@@ -119,6 +124,8 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
     @Override
     public Cursor<RowId> get(BinaryTuple key) {
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             byte[] rangeStart = rocksPrefix(key);
 
             byte[] rangeEnd = incrementArray(rangeStart);
@@ -131,12 +138,7 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
 
             it.seek(rangeStart);
 
-            return new BusyRocksIteratorAdapter<RowId>(busyLock, it) {
-                @Override
-                protected void handleBusyFail() {
-                    throw new StorageClosedException();
-                }
-
+            return new RocksIteratorAdapter<RowId>(it) {
                 @Override
                 protected RowId decodeEntry(byte[] key, byte[] value) {
                     // RowId UUID is located at the last 16 bytes of the key
@@ -146,6 +148,24 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
                     return new RowId(partitionStorage.partitionId(), mostSignificantBits, leastSignificantBits);
                 }
 
+                @Override
+                public boolean hasNext() {
+                    return busy(() -> {
+                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbHashIndexStorage.this::createStorageInfo);
+
+                        return super.hasNext();
+                    });
+                }
+
+                @Override
+                public RowId next() {
+                    return busy(() -> {
+                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbHashIndexStorage.this::createStorageInfo);
+
+                        return super.next();
+                    });
+                }
+
                 @Override
                 public void close() {
                     super.close();
@@ -174,6 +194,8 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
     @Override
     public void remove(IndexRow row) {
         busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             try {
                 WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
 
@@ -189,6 +211,8 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
     @Override
     public void destroy() {
         busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             byte[] rangeEnd = incrementArray(constantPrefix);
 
             assert rangeEnd != null;
@@ -231,7 +255,11 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
      * Closes the hash index storage.
      */
     public void close() {
-        if (!stopGuard.compareAndSet(false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
@@ -253,7 +281,7 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
 
     private <V> V busy(Supplier<V> supplier) {
         if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
         }
 
         try {
@@ -262,4 +290,58 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {
+        return IgniteStringFormatter.format("indexId={}, partitionId={}", descriptor.id(), partitionStorage.partitionId());
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            destroyData(writeBatch);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Aborts storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+     */
+    public void abortReblance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            destroyData(writeBatch);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    /**
+     * Completes storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+     */
+    public void finishRebalance() {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index a66568f2cd..bedad0045e 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -18,13 +18,16 @@
 package org.apache.ignite.internal.storage.rocksdb.index;
 
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementArray;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -33,16 +36,18 @@ import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.PeekCursor;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.storage.util.StorageState;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
@@ -74,11 +79,11 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
 
     private final RocksDbMvPartitionStorage partitionStorage;
 
-    /** Busy lock to stop synchronously. */
+    /** Busy lock. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /** Prevents double stopping the component. */
-    private final AtomicBoolean stopGuard = new AtomicBoolean();
+    /** Current state of the storage. */
+    private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
 
     /**
      * Creates a storage.
@@ -105,6 +110,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
 
             return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
@@ -129,6 +136,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
     @Override
     public void remove(IndexRow row) {
         busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             try {
                 WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
 
@@ -144,6 +153,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
     @Override
     public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
         return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
             boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
             boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
 
@@ -242,6 +253,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
             @Override
             public @Nullable T peek() {
                 return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbSortedIndexStorage.this::createStorageInfo);
+
                     if (hasNext != null) {
                         if (hasNext) {
                             return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
@@ -263,6 +276,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
             }
 
             private void advanceIfNeeded() throws StorageException {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbSortedIndexStorage.this::createStorageInfo);
+
                 if (hasNext != null) {
                     return;
                 }
@@ -363,7 +378,11 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
      * Closes the sorted index storage.
      */
     public void close() {
-        if (!stopGuard.compareAndSet(false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
@@ -387,7 +406,7 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
 
     private <V> V busy(Supplier<V> supplier) {
         if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
+            throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
         }
 
         try {
@@ -396,4 +415,58 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {
+        return IgniteStringFormatter.format("indexId={}, partitionId={}", descriptor.id(), partitionStorage.partitionId());
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            destroyData(writeBatch);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Aborts storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+     */
+    public void abortReblance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            destroyData(writeBatch);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    /**
+     * Completes storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+     */
+    public void finishRebalance() {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+    }
 }
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 3a56c297de..952a104876 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -126,27 +126,6 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
         assertThat(tableStorage.isVolatile(), is(false));
     }
 
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
-    @Override
-    public void testSuccessRebalance() throws Exception {
-        super.testSuccessRebalance();
-    }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
-    @Override
-    public void testFailRebalance() throws Exception {
-        super.testFailRebalance();
-    }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
-    @Override
-    public void testStartRebalanceForClosedPartition() {
-        super.testStartRebalanceForClosedPartition();
-    }
-
     @Test
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-18523")
     @Override