You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/18 10:53:08 UTC

[ignite-3] branch main updated: IGNITE-18566 Fix busyLock usage in RocksDbTableStorage and related (#1543)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2995285758 IGNITE-18566 Fix busyLock usage in RocksDbTableStorage and related (#1543)
2995285758 is described below

commit 29952857580e4eb4d24a902af03584bebf7083c0
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Jan 18 13:53:02 2023 +0300

    IGNITE-18566 Fix busyLock usage in RocksDbTableStorage and related (#1543)
---
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 489 ++++++++++-----------
 .../storage/rocksdb/RocksDbTableStorage.java       | 287 ++++++------
 .../rocksdb/index/RocksDbHashIndexStorage.java     | 127 +++---
 .../rocksdb/index/RocksDbSortedIndexStorage.java   | 109 ++---
 4 files changed, 493 insertions(+), 519 deletions(-)

diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 3523066ce6..2dbc8e4991 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -251,41 +251,36 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         if (threadLocalWriteBatch.get() != null) {
             return closure.execute();
         } else {
-            if (!busyLock.enterBusy()) {
-                throw new StorageClosedException();
-            }
-
-            try (var writeBatch = new WriteBatchWithIndex()) {
-                threadLocalWriteBatch.set(writeBatch);
+            return busy(() -> {
+                try (var writeBatch = new WriteBatchWithIndex()) {
+                    threadLocalWriteBatch.set(writeBatch);
 
-                pendingAppliedIndex = lastAppliedIndex;
-                pendingAppliedTerm = lastAppliedTerm;
-                pendingGroupConfig = lastGroupConfig;
+                    pendingAppliedIndex = lastAppliedIndex;
+                    pendingAppliedTerm = lastAppliedTerm;
+                    pendingGroupConfig = lastGroupConfig;
 
-                V res = closure.execute();
+                    V res = closure.execute();
 
-                try {
-                    if (writeBatch.count() > 0) {
-                        db.write(writeOpts, writeBatch);
+                    try {
+                        if (writeBatch.count() > 0) {
+                            db.write(writeOpts, writeBatch);
+                        }
+                    } catch (RocksDBException e) {
+                        throw new StorageException("Unable to apply a write batch to RocksDB instance.", e);
                     }
-                } catch (RocksDBException e) {
-                    throw new StorageException("Unable to apply a write batch to RocksDB instance.", e);
-                }
 
-                lastAppliedIndex = pendingAppliedIndex;
-                lastAppliedTerm = pendingAppliedTerm;
-                lastGroupConfig = pendingGroupConfig;
+                    lastAppliedIndex = pendingAppliedIndex;
+                    lastAppliedTerm = pendingAppliedTerm;
+                    lastGroupConfig = pendingGroupConfig;
 
-                return res;
-            } finally {
-                threadLocalWriteBatch.set(null);
-
-                busyLock.leaveBusy();
-            }
+                    return res;
+                } finally {
+                    threadLocalWriteBatch.set(null);
+                }
+            });
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> flush() {
         return busy(() -> tableStorage.awaitFlush(true));
@@ -318,23 +313,21 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
-        busy(() -> lastAppliedBusy(lastAppliedIndex, lastAppliedTerm));
-    }
-
-    private Void lastAppliedBusy(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
-        WriteBatchWithIndex writeBatch = requireWriteBatch();
+        busy(() -> {
+            WriteBatchWithIndex writeBatch = requireWriteBatch();
 
-        try {
-            writeBatch.put(meta, lastAppliedIndexKey, ByteUtils.longToBytes(lastAppliedIndex));
-            writeBatch.put(meta, lastAppliedTermKey, ByteUtils.longToBytes(lastAppliedTerm));
+            try {
+                writeBatch.put(meta, lastAppliedIndexKey, ByteUtils.longToBytes(lastAppliedIndex));
+                writeBatch.put(meta, lastAppliedTermKey, ByteUtils.longToBytes(lastAppliedTerm));
 
-            pendingAppliedIndex = lastAppliedIndex;
-            pendingAppliedTerm = lastAppliedTerm;
+                pendingAppliedIndex = lastAppliedIndex;
+                pendingAppliedTerm = lastAppliedTerm;
 
-            return null;
-        } catch (RocksDBException e) {
-            throw new StorageException(e);
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException(e);
+            }
+        });
     }
 
     @Override
@@ -350,21 +343,19 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     @Override
     public void committedGroupConfiguration(RaftGroupConfiguration config) {
-        busy(() -> committedGroupConfigurationBusy(config));
-    }
+        busy(() -> {
+            WriteBatchWithIndex writeBatch = requireWriteBatch();
 
-    private Void committedGroupConfigurationBusy(RaftGroupConfiguration config) {
-        WriteBatchWithIndex writeBatch = requireWriteBatch();
-
-        try {
-            writeBatch.put(meta, lastGroupConfigKey, ByteUtils.toBytes(config));
+            try {
+                writeBatch.put(meta, lastGroupConfigKey, ByteUtils.toBytes(config));
 
-            pendingGroupConfig = config;
+                pendingGroupConfig = config;
 
-            return null;
-        } catch (RocksDBException e) {
-            throw new StorageException(e);
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException(e);
+            }
+        });
     }
 
     /**
@@ -439,54 +430,51 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
-        return busy(() -> addWriteBusy(rowId, row, txId, commitTableId, commitPartitionId));
-    }
-
-    private @Nullable BinaryRow addWriteBusy(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
-            throws TxIdMismatchException, StorageException {
-        @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
+        return busy(() -> {
+            @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
 
-        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
-        BinaryRow res = null;
+            BinaryRow res = null;
 
-        try {
-            // Check concurrent transaction data.
-            byte[] keyBufArray = keyBuf.array();
+            try {
+                // Check concurrent transaction data.
+                byte[] keyBufArray = keyBuf.array();
 
-            byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
+                byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
 
-            byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, readOpts, keyBytes);
+                byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, readOpts, keyBytes);
 
-            // Previous value must belong to the same transaction.
-            if (previousValue != null) {
-                validateTxId(previousValue, txId);
+                // Previous value must belong to the same transaction.
+                if (previousValue != null) {
+                    validateTxId(previousValue, txId);
 
-                res = wrapValueIntoBinaryRow(previousValue, true);
-            }
+                    res = wrapValueIntoBinaryRow(previousValue, true);
+                }
 
-            if (row == null) {
-                // Write empty value as a tombstone.
-                if (previousValue != null) {
-                    // Reuse old array with transaction id already written to it.
-                    writeBatch.put(cf, keyBytes, copyOf(previousValue, VALUE_HEADER_SIZE));
-                } else {
-                    byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
+                if (row == null) {
+                    // Write empty value as a tombstone.
+                    if (previousValue != null) {
+                        // Reuse old array with transaction id already written to it.
+                        writeBatch.put(cf, keyBytes, copyOf(previousValue, VALUE_HEADER_SIZE));
+                    } else {
+                        byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
 
-                    putUuidToBytes(txId, valueHeaderBytes, TX_ID_OFFSET);
-                    putUuidToBytes(commitTableId, valueHeaderBytes, TABLE_ID_OFFSET);
-                    putShort(valueHeaderBytes, PARTITION_ID_OFFSET, (short) commitPartitionId);
+                        putUuidToBytes(txId, valueHeaderBytes, TX_ID_OFFSET);
+                        putUuidToBytes(commitTableId, valueHeaderBytes, TABLE_ID_OFFSET);
+                        putShort(valueHeaderBytes, PARTITION_ID_OFFSET, (short) commitPartitionId);
 
-                    writeBatch.put(cf, keyBytes, valueHeaderBytes);
+                        writeBatch.put(cf, keyBytes, valueHeaderBytes);
+                    }
+                } else {
+                    writeUnversioned(keyBufArray, row, txId, commitTableId, commitPartitionId);
                 }
-            } else {
-                writeUnversioned(keyBufArray, row, txId, commitTableId, commitPartitionId);
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to update a row in storage", e);
             }
-        } catch (RocksDBException e) {
-            throw new StorageException("Failed to update a row in storage", e);
-        }
 
-        return res;
+            return res;
+        });
     }
 
     /**
@@ -523,120 +511,112 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
-        return busy(() -> abortWriteBusy(rowId));
-    }
-
-    private @Nullable BinaryRow abortWriteBusy(RowId rowId) throws StorageException {
-        WriteBatchWithIndex writeBatch = requireWriteBatch();
+        return busy(() -> {
+            WriteBatchWithIndex writeBatch = requireWriteBatch();
 
-        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
-        try {
-            byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+            try {
+                byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
 
-            byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, readOpts, keyBytes);
+                byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, readOpts, keyBytes);
 
-            if (previousValue == null) {
-                //the chain doesn't contain an uncommitted write intent
-                return null;
-            }
+                if (previousValue == null) {
+                    //the chain doesn't contain an uncommitted write intent
+                    return null;
+                }
 
-            // Perform unconditional remove for the key without associated timestamp.
-            writeBatch.delete(cf, keyBytes);
+                // Perform unconditional remove for the key without associated timestamp.
+                writeBatch.delete(cf, keyBytes);
 
-            return wrapValueIntoBinaryRow(previousValue, true);
-        } catch (RocksDBException e) {
-            throw new StorageException("Failed to roll back insert/update", e);
-        }
+                return wrapValueIntoBinaryRow(previousValue, true);
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to roll back insert/update", e);
+            }
+        });
     }
 
     @Override
     public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException {
-        busy(() -> commitWriteBusy(rowId, timestamp));
-    }
-
-    private Void commitWriteBusy(RowId rowId, HybridTimestamp timestamp) throws StorageException {
-        WriteBatchWithIndex writeBatch = requireWriteBatch();
+        busy(() -> {
+            WriteBatchWithIndex writeBatch = requireWriteBatch();
 
-        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
-        try {
-            // Read a value associated with pending write.
-            byte[] uncommittedKeyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+            try {
+                // Read a value associated with pending write.
+                byte[] uncommittedKeyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
 
-            byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts, uncommittedKeyBytes);
+                byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts, uncommittedKeyBytes);
 
-            if (valueBytes == null) {
-                //the chain doesn't contain an uncommitted write intent
-                return null;
-            }
+                if (valueBytes == null) {
+                    //the chain doesn't contain an uncommitted write intent
+                    return null;
+                }
 
-            // Delete pending write.
-            writeBatch.delete(cf, uncommittedKeyBytes);
+                // Delete pending write.
+                writeBatch.delete(cf, uncommittedKeyBytes);
 
-            // Add timestamp to the key, and put the value back into the storage.
-            putTimestamp(keyBuf, timestamp);
+                // Add timestamp to the key, and put the value back into the storage.
+                putTimestamp(keyBuf, timestamp);
 
-            writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
+                writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
 
-            return null;
-        } catch (RocksDBException e) {
-            throw new StorageException("Failed to commit row into storage", e);
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to commit row into storage", e);
+            }
+        });
     }
 
     @Override
     public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
-        busy(() -> addWriteCommittedBusy(rowId, row, commitTimestamp));
-    }
+        busy(() -> {
+            @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
 
-    private Void addWriteCommittedBusy(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
-        @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
-
-        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-        putTimestamp(keyBuf, commitTimestamp);
+            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            putTimestamp(keyBuf, commitTimestamp);
 
-        //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
-        byte[] rowBytes = rowBytes(row);
+            //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+            byte[] rowBytes = rowBytes(row);
 
-        try {
-            writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes);
+            try {
+                writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes);
 
-            return null;
-        } catch (RocksDBException e) {
-            throw new StorageException("Failed to update a row in storage", e);
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to update a row in storage", e);
+            }
+        });
     }
 
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
-        return busy(() -> readBusy(rowId, timestamp));
-    }
-
-    private ReadResult readBusy(RowId rowId, HybridTimestamp timestamp) throws StorageException {
-        if (rowId.partitionId() != partitionId) {
-            throw new IllegalArgumentException(
-                    String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
-        }
-
-        // We can read data outside of consistency closure. Batch is not required.
-        WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
+        return busy(() -> {
+            if (rowId.partitionId() != partitionId) {
+                throw new IllegalArgumentException(
+                        String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
+            }
 
-        try (
-                // Set next partition as an upper bound.
-                RocksIterator baseIterator = db.newIterator(cf, upperBoundReadOpts);
-                // "count()" check is mandatory. Write batch iterator without any updates just crashes everything.
-                // It's not documented, but this is exactly how it should be used.
-                RocksIterator seekIterator = writeBatch != null && writeBatch.count() > 0
-                        ? writeBatch.newIteratorWithBase(cf, baseIterator)
-                        : baseIterator
-        ) {
-            if (lookingForLatestVersions(timestamp)) {
-                return readLatestVersion(rowId, seekIterator);
-            } else {
-                return readByTimestamp(seekIterator, rowId, timestamp);
+            // We can read data outside of consistency closure. Batch is not required.
+            WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
+
+            try (
+                    // Set next partition as an upper bound.
+                    RocksIterator baseIterator = db.newIterator(cf, upperBoundReadOpts);
+                    // "count()" check is mandatory. Write batch iterator without any updates just crashes everything.
+                    // It's not documented, but this is exactly how it should be used.
+                    RocksIterator seekIterator = writeBatch != null && writeBatch.count() > 0
+                            ? writeBatch.newIteratorWithBase(cf, baseIterator)
+                            : baseIterator
+            ) {
+                if (lookingForLatestVersions(timestamp)) {
+                    return readLatestVersion(rowId, seekIterator);
+                } else {
+                    return readByTimestamp(seekIterator, rowId, timestamp);
+                }
             }
-        }
+        });
     }
 
     private static boolean lookingForLatestVersions(HybridTimestamp timestamp) {
@@ -829,56 +809,53 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        return busy(() -> scanVersionsBusy(rowId));
-    }
-
-    private Cursor<ReadResult> scanVersionsBusy(RowId rowId) throws StorageException {
-        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+        return busy(() -> {
+            ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
-        byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+            byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
 
-        incrementRowId(keyBuf);
+            incrementRowId(keyBuf);
 
-        Slice upperBound = new Slice(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+            Slice upperBound = new Slice(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
 
-        var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+            var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
 
-        RocksIterator it = db.newIterator(cf, options);
+            RocksIterator it = db.newIterator(cf, options);
 
-        WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
+            WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
 
-        if (writeBatch != null && writeBatch.count() > 0) {
-            it = writeBatch.newIteratorWithBase(cf, it);
-        }
+            if (writeBatch != null && writeBatch.count() > 0) {
+                it = writeBatch.newIteratorWithBase(cf, it);
+            }
 
-        it.seek(lowerBound);
+            it.seek(lowerBound);
 
-        return new BusyRocksIteratorAdapter<>(busyLock, it) {
-            @Override
-            protected ReadResult decodeEntry(byte[] key, byte[] value) {
-                int keyLength = key.length;
+            return new BusyRocksIteratorAdapter<ReadResult>(busyLock, it) {
+                @Override
+                protected ReadResult decodeEntry(byte[] key, byte[] value) {
+                    int keyLength = key.length;
 
-                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+                    boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
-            }
+                    return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
+                }
 
-            @Override
-            protected void handleBusyFail() {
-                throw new StorageClosedException();
-            }
+                @Override
+                protected void handleBusyFail() {
+                    throw new StorageClosedException();
+                }
 
-            @Override
-            public void close() {
-                super.close();
+                @Override
+                public void close() {
+                    super.close();
 
-                RocksUtils.closeAll(options, upperBound);
-            }
-        };
+                    RocksUtils.closeAll(options, upperBound);
+                }
+            };
+        });
     }
 
     // TODO: IGNITE-16914 Play with prefix settings and benchmark results.
-    /** {@inheritDoc} */
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
         Objects.requireNonNull(timestamp, "timestamp is null");
@@ -905,29 +882,27 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
-        return busy(() -> closestRowIdBusy(lowerBound));
-    }
-
-    private @Nullable RowId closestRowIdBusy(RowId lowerBound) throws StorageException {
-        ByteBuffer keyBuf = prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
+        return busy(() -> {
+            ByteBuffer keyBuf = prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
 
-        try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
-            it.seek(keyBuf);
+            try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
+                it.seek(keyBuf);
 
-            if (!it.isValid()) {
-                RocksUtils.checkIterator(it);
+                if (!it.isValid()) {
+                    RocksUtils.checkIterator(it);
 
-                return null;
-            }
+                    return null;
+                }
 
-            ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(ROW_PREFIX_SIZE);
+                ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(ROW_PREFIX_SIZE);
 
-            it.key(readKeyBuf);
+                it.key(readKeyBuf);
 
-            return getRowId(readKeyBuf);
-        } finally {
-            keyBuf.limit(MAX_KEY_SIZE);
-        }
+                return getRowId(readKeyBuf);
+            } finally {
+                keyBuf.limit(MAX_KEY_SIZE);
+            }
+        });
     }
 
     private static void incrementRowId(ByteBuffer buf) {
@@ -964,26 +939,24 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     @Override
     public long rowsCount() {
-        return busy(this::rowsCountBusy);
-    }
+        return busy(() -> {
+            try (
+                    var upperBound = new Slice(partitionEndPrefix());
+                    var options = new ReadOptions().setIterateUpperBound(upperBound);
+                    RocksIterator it = db.newIterator(cf, options)
+            ) {
+                it.seek(partitionStartPrefix());
 
-    private long rowsCountBusy() {
-        try (
-                var upperBound = new Slice(partitionEndPrefix());
-                var options = new ReadOptions().setIterateUpperBound(upperBound);
-                RocksIterator it = db.newIterator(cf, options)
-        ) {
-            it.seek(partitionStartPrefix());
+                long size = 0;
 
-            long size = 0;
+                while (it.isValid()) {
+                    ++size;
+                    it.next();
+                }
 
-            while (it.isValid()) {
-                ++size;
-                it.next();
+                return size;
             }
-
-            return size;
-        }
+        });
     }
 
     /**
@@ -1208,44 +1181,40 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         @Override
         public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
-            return busy(() -> committedBusy(timestamp));
-        }
-
-        private @Nullable BinaryRow committedBusy(HybridTimestamp timestamp) {
-            Objects.requireNonNull(timestamp, "timestamp is null");
+            return busy(() -> {
+                Objects.requireNonNull(timestamp, "timestamp is null");
 
-            if (currentRowId == null) {
-                throw new IllegalStateException("currentRowId is null");
-            }
+                if (currentRowId == null) {
+                    throw new IllegalStateException("currentRowId is null");
+                }
 
-            setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
+                setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
 
-            it.seek(seekKeyBuf.array());
+                it.seek(seekKeyBuf.array());
 
-            ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
+                ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
 
-            if (readResult.isEmpty()) {
-                return null;
-            }
+                if (readResult.isEmpty()) {
+                    return null;
+                }
 
-            return readResult.binaryRow();
+                return readResult.binaryRow();
+            });
         }
 
         @Override
         public final ReadResult next() {
-            return busy(this::nextBusy);
-        }
-
-        private ReadResult nextBusy() {
-            if (!hasNextBusy()) {
-                throw new NoSuchElementException();
-            }
+            return busy(() -> {
+                if (!hasNextBusy()) {
+                    throw new NoSuchElementException();
+                }
 
-            ReadResult res = next;
+                ReadResult res = next;
 
-            next = null;
+                next = null;
 
-            return res;
+                return res;
+            });
         }
 
         @Override
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 66ab805e79..dfbf279ebf 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexId;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -190,84 +191,86 @@ public class RocksDbTableStorage implements MvTableStorage {
 
     @Override
     public void start() throws StorageException {
-        flusher = new RocksDbFlusher(
-                busyLock,
-                engine.scheduledPool(),
-                engine().threadPool(),
-                engine.configuration().flushDelayMillis()::value,
-                this::refreshPersistedIndexes
-        );
+        inBusyLock(busyLock, () -> {
+            flusher = new RocksDbFlusher(
+                    busyLock,
+                    engine.scheduledPool(),
+                    engine().threadPool(),
+                    engine.configuration().flushDelayMillis()::value,
+                    this::refreshPersistedIndexes
+            );
 
-        try {
-            Files.createDirectories(tablePath);
-        } catch (IOException e) {
-            throw new StorageException("Failed to create a directory for the table storage", e);
-        }
+            try {
+                Files.createDirectories(tablePath);
+            } catch (IOException e) {
+                throw new StorageException("Failed to create a directory for the table storage", e);
+            }
 
-        List<ColumnFamilyDescriptor> cfDescriptors = getExistingCfDescriptors();
+            List<ColumnFamilyDescriptor> cfDescriptors = getExistingCfDescriptors();
 
-        List<ColumnFamilyHandle> cfHandles = new ArrayList<>(cfDescriptors.size());
+            List<ColumnFamilyHandle> cfHandles = new ArrayList<>(cfDescriptors.size());
 
-        DBOptions dbOptions = new DBOptions()
-                .setCreateIfMissing(true)
-                .setCreateMissingColumnFamilies(true)
-                // Atomic flush must be enabled to guarantee consistency between different column families when WAL is disabled.
-                .setAtomicFlush(true)
-                .setListeners(List.of(flusher.listener()))
-                .setWriteBufferManager(dataRegion.writeBufferManager());
+            DBOptions dbOptions = new DBOptions()
+                    .setCreateIfMissing(true)
+                    .setCreateMissingColumnFamilies(true)
+                    // Atomic flush must be enabled to guarantee consistency between different column families when WAL is disabled.
+                    .setAtomicFlush(true)
+                    .setListeners(List.of(flusher.listener()))
+                    .setWriteBufferManager(dataRegion.writeBufferManager());
 
-        try {
-            db = RocksDB.open(dbOptions, tablePath.toAbsolutePath().toString(), cfDescriptors, cfHandles);
+            try {
+                db = RocksDB.open(dbOptions, tablePath.toAbsolutePath().toString(), cfDescriptors, cfHandles);
 
-            // read all existing Column Families from the db and parse them according to type: meta, partition data or index.
-            for (ColumnFamilyHandle cfHandle : cfHandles) {
-                ColumnFamily cf = ColumnFamily.wrap(db, cfHandle);
+                // read all existing Column Families from the db and parse them according to type: meta, partition data or index.
+                for (ColumnFamilyHandle cfHandle : cfHandles) {
+                    ColumnFamily cf = ColumnFamily.wrap(db, cfHandle);
 
-                switch (ColumnFamilyType.fromCfName(cf.name())) {
-                    case META:
-                        meta = new RocksDbMetaStorage(cf);
+                    switch (ColumnFamilyType.fromCfName(cf.name())) {
+                        case META:
+                            meta = new RocksDbMetaStorage(cf);
 
-                        break;
+                            break;
 
-                    case PARTITION:
-                        partitionCf = cf;
+                        case PARTITION:
+                            partitionCf = cf;
 
-                        break;
+                            break;
 
-                    case HASH_INDEX:
-                        hashIndexCf = cf;
+                        case HASH_INDEX:
+                            hashIndexCf = cf;
 
-                        break;
+                            break;
 
-                    case SORTED_INDEX:
-                        UUID indexId = sortedIndexId(cf.name());
+                        case SORTED_INDEX:
+                            UUID indexId = sortedIndexId(cf.name());
 
-                        var indexDescriptor = new SortedIndexDescriptor(indexId, tablesCfg.value());
+                            var indexDescriptor = new SortedIndexDescriptor(indexId, tablesCfg.value());
 
-                        sortedIndices.put(indexId, new SortedIndex(cf, indexDescriptor));
+                            sortedIndices.put(indexId, new SortedIndex(cf, indexDescriptor));
 
-                        break;
+                            break;
 
-                    default:
-                        throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
-                                + tableCfg.value().name() + ']');
+                        default:
+                            throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
+                                    + tableCfg.value().name() + ']');
+                    }
                 }
-            }
 
-            assert meta != null;
-            assert partitionCf != null;
-            assert hashIndexCf != null;
+                assert meta != null;
+                assert partitionCf != null;
+                assert hashIndexCf != null;
 
-            flusher.init(db, cfHandles);
-        } catch (RocksDBException e) {
-            throw new StorageException("Failed to initialize RocksDB instance", e);
-        }
+                flusher.init(db, cfHandles);
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to initialize RocksDB instance", e);
+            }
 
-        partitions = new AtomicReferenceArray<>(tableCfg.value().partitions());
+            partitions = new AtomicReferenceArray<>(tableCfg.value().partitions());
 
-        for (int partId : meta.getPartitionIds()) {
-            partitions.set(partId, new RocksDbMvPartitionStorage(this, partId));
-        }
+            for (int partId : meta.getPartitionIds()) {
+                partitions.set(partId, new RocksDbMvPartitionStorage(this, partId));
+            }
+        });
     }
 
     /**
@@ -277,7 +280,7 @@ public class RocksDbTableStorage implements MvTableStorage {
      * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} should be explicitly triggerred in the near future.
      */
     public CompletableFuture<Void> awaitFlush(boolean schedule) {
-        return flusher.awaitFlush(schedule);
+        return inBusyLock(busyLock, () -> flusher.awaitFlush(schedule));
     }
 
     private void refreshPersistedIndexes() {
@@ -289,7 +292,7 @@ public class RocksDbTableStorage implements MvTableStorage {
             TableView tableCfgView = configuration().value();
 
             for (int partitionId = 0; partitionId < tableCfgView.partitions(); partitionId++) {
-                RocksDbMvPartitionStorage partition = getMvPartition(partitionId);
+                RocksDbMvPartitionStorage partition = getMvPartitionBusy(partitionId);
 
                 if (partition != null) {
                     try {
@@ -371,23 +374,29 @@ public class RocksDbTableStorage implements MvTableStorage {
 
     @Override
     public RocksDbMvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
-        RocksDbMvPartitionStorage partition = getMvPartition(partitionId);
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage partition = getMvPartitionBusy(partitionId);
 
-        if (partition != null) {
-            return partition;
-        }
+            if (partition != null) {
+                return partition;
+            }
 
-        partition = new RocksDbMvPartitionStorage(this, partitionId);
+            partition = new RocksDbMvPartitionStorage(this, partitionId);
 
-        partitions.set(partitionId, partition);
+            partitions.set(partitionId, partition);
 
-        meta.putPartitionId(partitionId);
+            meta.putPartitionId(partitionId);
 
-        return partition;
+            return partition;
+        });
     }
 
     @Override
     public @Nullable RocksDbMvPartitionStorage getMvPartition(int partitionId) {
+        return inBusyLock(busyLock, () -> getMvPartitionBusy(partitionId));
+    }
+
+    private @Nullable RocksDbMvPartitionStorage getMvPartitionBusy(int partitionId) {
         checkPartitionId(partitionId);
 
         return partitions.get(partitionId);
@@ -395,69 +404,73 @@ public class RocksDbTableStorage implements MvTableStorage {
 
     @Override
     public CompletableFuture<Void> destroyPartition(int partitionId) {
-        checkPartitionId(partitionId);
+        return inBusyLock(busyLock, () -> {
+            checkPartitionId(partitionId);
 
-        CompletableFuture<Void> destroyPartitionFuture = new CompletableFuture<>();
+            CompletableFuture<Void> destroyPartitionFuture = new CompletableFuture<>();
 
-        CompletableFuture<Void> previousDestroyPartitionFuture = partitionIdDestroyFutureMap.putIfAbsent(
-                partitionId,
-                destroyPartitionFuture
-        );
+            CompletableFuture<Void> previousDestroyPartitionFuture = partitionIdDestroyFutureMap.putIfAbsent(
+                    partitionId,
+                    destroyPartitionFuture
+            );
 
-        if (previousDestroyPartitionFuture != null) {
-            return previousDestroyPartitionFuture;
-        }
+            if (previousDestroyPartitionFuture != null) {
+                return previousDestroyPartitionFuture;
+            }
 
-        RocksDbMvPartitionStorage mvPartition = partitions.getAndSet(partitionId, null);
+            RocksDbMvPartitionStorage mvPartition = partitions.getAndSet(partitionId, null);
 
-        if (mvPartition != null) {
-            try (WriteBatch writeBatch = new WriteBatch()) {
-                mvPartition.close();
+            if (mvPartition != null) {
+                try (WriteBatch writeBatch = new WriteBatch()) {
+                    mvPartition.close();
 
-                // Operation to delete partition data should be fast, since we will write only the range of keys for deletion, and the
-                // RocksDB itself will then destroy the data on flash.
-                mvPartition.destroyData(writeBatch);
+                    // Operation to delete partition data should be fast, since we will write only the range of keys for deletion, and the
+                    // RocksDB itself will then destroy the data on flash.
+                    mvPartition.destroyData(writeBatch);
 
-                for (HashIndex hashIndex : hashIndices.values()) {
-                    hashIndex.destroy(partitionId, writeBatch);
-                }
+                    for (HashIndex hashIndex : hashIndices.values()) {
+                        hashIndex.destroy(partitionId, writeBatch);
+                    }
 
-                for (SortedIndex sortedIndex : sortedIndices.values()) {
-                    sortedIndex.destroy(partitionId, writeBatch);
-                }
+                    for (SortedIndex sortedIndex : sortedIndices.values()) {
+                        sortedIndex.destroy(partitionId, writeBatch);
+                    }
 
-                db.write(writeOptions, writeBatch);
+                    db.write(writeOptions, writeBatch);
 
-                CompletableFuture<?> flushFuture = awaitFlush(true);
+                    CompletableFuture<?> flushFuture = awaitFlush(true);
 
-                flushFuture.whenComplete((unused, throwable) -> {
-                    if (throwable == null) {
-                        partitionIdDestroyFutureMap.remove(partitionId).complete(null);
-                    } else {
-                        partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
-                    }
-                });
-            } catch (Throwable throwable) {
-                partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+                    flushFuture.whenComplete((unused, throwable) -> {
+                        if (throwable == null) {
+                            partitionIdDestroyFutureMap.remove(partitionId).complete(null);
+                        } else {
+                            partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+                        }
+                    });
+                } catch (Throwable throwable) {
+                    partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+                }
+            } else {
+                partitionIdDestroyFutureMap.remove(partitionId).complete(null);
             }
-        } else {
-            partitionIdDestroyFutureMap.remove(partitionId).complete(null);
-        }
 
-        return destroyPartitionFuture;
+            return destroyPartitionFuture;
+        });
     }
 
     @Override
     public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
-        SortedIndex storages = sortedIndices.computeIfAbsent(indexId, this::createSortedIndex);
+        return inBusyLock(busyLock, () -> {
+            SortedIndex storages = sortedIndices.computeIfAbsent(indexId, this::createSortedIndex);
 
-        RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
+            RocksDbMvPartitionStorage partitionStorage = getMvPartitionBusy(partitionId);
 
-        if (partitionStorage == null) {
-            throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
-        }
+            if (partitionStorage == null) {
+                throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
+            }
 
-        return storages.getOrCreateStorage(partitionStorage);
+            return storages.getOrCreateStorage(partitionStorage);
+        });
     }
 
     private SortedIndex createSortedIndex(UUID indexId) {
@@ -479,45 +492,49 @@ public class RocksDbTableStorage implements MvTableStorage {
 
     @Override
     public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) {
-        HashIndex storages = hashIndices.computeIfAbsent(indexId, id -> {
-            var indexDescriptor = new HashIndexDescriptor(indexId, tablesCfg.value());
+        return inBusyLock(busyLock, () -> {
+            HashIndex storages = hashIndices.computeIfAbsent(indexId, id -> {
+                var indexDescriptor = new HashIndexDescriptor(indexId, tablesCfg.value());
 
-            return new HashIndex(hashIndexCf, indexDescriptor);
-        });
+                return new HashIndex(hashIndexCf, indexDescriptor);
+            });
 
-        RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
+            RocksDbMvPartitionStorage partitionStorage = getMvPartitionBusy(partitionId);
 
-        if (partitionStorage == null) {
-            throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
-        }
+            if (partitionStorage == null) {
+                throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
+            }
 
-        return storages.getOrCreateStorage(partitionStorage);
+            return storages.getOrCreateStorage(partitionStorage);
+        });
     }
 
     @Override
     public CompletableFuture<Void> destroyIndex(UUID indexId) {
-        HashIndex hashIdx = hashIndices.remove(indexId);
+        return inBusyLock(busyLock, () -> {
+            HashIndex hashIdx = hashIndices.remove(indexId);
 
-        if (hashIdx != null) {
-            hashIdx.destroy();
-        }
+            if (hashIdx != null) {
+                hashIdx.destroy();
+            }
 
-        // Sorted Indexes have a separate Column Family per index, so we simply destroy it immediately after a flush completes
-        // in order to avoid concurrent access to the CF.
-        SortedIndex sortedIdx = sortedIndices.remove(indexId);
+            // Sorted Indexes have a separate Column Family per index, so we simply destroy it immediately after a flush completes
+            // in order to avoid concurrent access to the CF.
+            SortedIndex sortedIdx = sortedIndices.remove(indexId);
 
-        if (sortedIdx != null) {
-            // Remove the to-be destroyed CF from the flusher
-            flusher.removeColumnFamily(sortedIdx.indexCf().handle());
+            if (sortedIdx != null) {
+                // Remove the to-be destroyed CF from the flusher
+                flusher.removeColumnFamily(sortedIdx.indexCf().handle());
 
-            sortedIdx.destroy();
-        }
+                sortedIdx.destroy();
+            }
 
-        if (hashIdx == null) {
-            return completedFuture(null);
-        } else {
-            return awaitFlush(false);
-        }
+            if (hashIdx == null) {
+                return completedFuture(null);
+            } else {
+                return awaitFlush(false);
+            }
+        });
     }
 
     @Override
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index ce14828c2c..cb43a95e52 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -117,93 +118,89 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
-        }
-
-        byte[] rangeStart = rocksPrefix(key);
-
-        byte[] rangeEnd = incrementArray(rangeStart);
+        return busy(() -> {
+            byte[] rangeStart = rocksPrefix(key);
 
-        Slice upperBound = rangeEnd == null ? null : new Slice(rangeEnd);
+            byte[] rangeEnd = incrementArray(rangeStart);
 
-        ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound);
+            Slice upperBound = rangeEnd == null ? null : new Slice(rangeEnd);
 
-        RocksIterator it = indexCf.newIterator(options);
+            ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound);
 
-        it.seek(rangeStart);
+            RocksIterator it = indexCf.newIterator(options);
 
-        busyLock.leaveBusy();
+            it.seek(rangeStart);
 
-        return new BusyRocksIteratorAdapter<>(busyLock, it) {
-            @Override
-            protected void handleBusyFail() {
-                throw new StorageClosedException();
-            }
+            return new BusyRocksIteratorAdapter<RowId>(busyLock, it) {
+                @Override
+                protected void handleBusyFail() {
+                    throw new StorageClosedException();
+                }
 
-            @Override
-            protected RowId decodeEntry(byte[] key, byte[] value) {
-                // RowId UUID is located at the last 16 bytes of the key
-                long mostSignificantBits = bytesToLong(key, key.length - Long.BYTES * 2);
-                long leastSignificantBits = bytesToLong(key, key.length - Long.BYTES);
+                @Override
+                protected RowId decodeEntry(byte[] key, byte[] value) {
+                    // RowId UUID is located at the last 16 bytes of the key
+                    long mostSignificantBits = bytesToLong(key, key.length - Long.BYTES * 2);
+                    long leastSignificantBits = bytesToLong(key, key.length - Long.BYTES);
 
-                return new RowId(partitionStorage.partitionId(), mostSignificantBits, leastSignificantBits);
-            }
+                    return new RowId(partitionStorage.partitionId(), mostSignificantBits, leastSignificantBits);
+                }
 
-            @Override
-            public void close() {
-                super.close();
+                @Override
+                public void close() {
+                    super.close();
 
-                RocksUtils.closeAll(options, upperBound);
-            }
-        };
+                    RocksUtils.closeAll(options, upperBound);
+                }
+            };
+        });
     }
 
     @Override
     public void put(IndexRow row) {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
-        }
+        busy(() -> {
+            try {
+                WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
 
-        try {
-            WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+                writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
 
-            writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
-        } catch (RocksDBException e) {
-            throw new StorageException("Unable to insert data into hash index. Index ID: " + descriptor.id(), e);
-        } finally {
-            busyLock.leaveBusy();
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException("Unable to insert data into hash index. Index ID: " + descriptor.id(), e);
+            }
+        });
     }
 
     @Override
     public void remove(IndexRow row) {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
-        }
+        busy(() -> {
+            try {
+                WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
 
-        try {
-            WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+                writeBatch.delete(indexCf.handle(), rocksKey(row));
 
-            writeBatch.delete(indexCf.handle(), rocksKey(row));
-        } catch (RocksDBException e) {
-            throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
-        } finally {
-            busyLock.leaveBusy();
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
+            }
+        });
     }
 
     @Override
     public void destroy() {
-        byte[] rangeEnd = incrementArray(constantPrefix);
+        busy(() -> {
+            byte[] rangeEnd = incrementArray(constantPrefix);
 
-        assert rangeEnd != null;
+            assert rangeEnd != null;
 
-        try (WriteOptions writeOptions = new WriteOptions().setDisableWAL(true)) {
-            indexCf.db().deleteRange(indexCf.handle(), writeOptions, constantPrefix, rangeEnd);
-        } catch (RocksDBException e) {
-            throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
-        }
+            try (WriteOptions writeOptions = new WriteOptions().setDisableWAL(true)) {
+                indexCf.db().deleteRange(indexCf.handle(), writeOptions, constantPrefix, rangeEnd);
+
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
+            }
+        });
     }
 
     private byte[] rocksPrefix(BinaryTuple prefix) {
@@ -253,4 +250,16 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
 
         writeBatch.deleteRange(indexCf.handle(), constantPrefix, rangeEnd);
     }
+
+    private <V> V busy(Supplier<V> supplier) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            return supplier.get();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 99fc1b84dc..a66568f2cd 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -26,6 +26,7 @@ import java.nio.ByteOrder;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -103,67 +104,51 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
-        }
-
-        BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
+        return busy(() -> {
+            BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
 
-        try {
             return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
-        } finally {
-            busyLock.leaveBusy();
-        }
+        });
     }
 
     @Override
     public void put(IndexRow row) {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
-        }
+        busy(() -> {
+            try {
+                WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
 
-        try {
-            WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+                writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
 
-            writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
-        } catch (RocksDBException e) {
-            throw new StorageException("Unable to insert data into sorted index. Index ID: " + descriptor.id(), e);
-        } finally {
-            busyLock.leaveBusy();
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException("Unable to insert data into sorted index. Index ID: " + descriptor.id(), e);
+            }
+        });
     }
 
     @Override
     public void remove(IndexRow row) {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
-        }
+        busy(() -> {
+            try {
+                WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
 
-        try {
-            WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+                writeBatch.delete(indexCf.handle(), rocksKey(row));
 
-            writeBatch.delete(indexCf.handle(), rocksKey(row));
-        } catch (RocksDBException e) {
-            throw new StorageException("Unable to remove data from sorted index. Index ID: " + descriptor.id(), e);
-        } finally {
-            busyLock.leaveBusy();
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new StorageException("Unable to remove data from sorted index. Index ID: " + descriptor.id(), e);
+            }
+        });
     }
 
     @Override
     public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
-        }
+        return busy(() -> {
+            boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+            boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
 
-        boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
-        boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
-
-        try {
             return scan(lowerBound, upperBound, includeLower, includeUpper, this::decodeRow);
-        } finally {
-            busyLock.leaveBusy();
-        }
+        });
     }
 
     private <T> PeekCursor<T> scan(
@@ -230,26 +215,16 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
 
             @Override
             public boolean hasNext() {
-                if (!busyLock.enterBusy()) {
-                    throw new StorageClosedException();
-                }
-
-                try {
+                return busy(() -> {
                     advanceIfNeeded();
 
                     return hasNext;
-                } finally {
-                    busyLock.leaveBusy();
-                }
+                });
             }
 
             @Override
             public T next() {
-                if (!busyLock.enterBusy()) {
-                    throw new StorageClosedException();
-                }
-
-                try {
+                return busy(() -> {
                     advanceIfNeeded();
 
                     boolean hasNext = this.hasNext;
@@ -261,18 +236,12 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
                     this.hasNext = null;
 
                     return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
-                } finally {
-                    busyLock.leaveBusy();
-                }
+                });
             }
 
             @Override
             public @Nullable T peek() {
-                if (!busyLock.enterBusy()) {
-                    throw new StorageClosedException();
-                }
-
-                try {
+                return busy(() -> {
                     if (hasNext != null) {
                         if (hasNext) {
                             return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
@@ -290,9 +259,7 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
                     } else {
                         return mapper.apply(ByteBuffer.wrap(it.key()).order(ORDER));
                     }
-                } finally {
-                    busyLock.leaveBusy();
-                }
+                });
             }
 
             private void advanceIfNeeded() throws StorageException {
@@ -417,4 +384,16 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
 
         writeBatch.deleteRange(indexCf.handle(), constantPrefix, rangeEnd);
     }
+
+    private <V> V busy(Supplier<V> supplier) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
+
+        try {
+            return supplier.get();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
 }