You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/25 11:05:53 UTC

[ignite-3] branch main updated: IGNITE-18629 Slightly improve busyLock usage in TxStateRocksDbStorage (#1574)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 01f30d0d0d IGNITE-18629 Slightly improve busyLock usage in TxStateRocksDbStorage (#1574)
01f30d0d0d is described below

commit 01f30d0d0dfaec356cbdf4cf5c02165f75af8e83
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Jan 25 14:05:48 2023 +0300

    IGNITE-18629 Slightly improve busyLock usage in TxStateRocksDbStorage (#1574)
---
 .../raft/snapshot/PartitionAccessImpl.java         |   2 +-
 .../state/rocksdb/TxStateRocksDbStorage.java       | 501 +++++++++------------
 .../state/rocksdb/RocksDbTxStateStorageTest.java   |   6 +-
 .../storage/state/AbstractTxStateStorageTest.java  |   2 +-
 4 files changed, 223 insertions(+), 288 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index 4748a541da..431a8c413c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -167,7 +167,7 @@ public class PartitionAccessImpl implements PartitionAccess {
 
     @Override
     public CompletableFuture<Void> startRebalance() {
-        // TODO: IGNITE-18619 Fix this bullshit, we should have already waited for the indexes to be created
+        // TODO: IGNITE-18619 Fix it, we should have already waited for the indexes to be created
         indexes.get();
 
         TxStateStorage txStateStorage = getTxStateStorage(partitionId());
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 6d3200f278..5aa7ddfa37 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -26,8 +26,6 @@ import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_E
 import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_REBALANCE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_STOPPED_ERR;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Objects;
@@ -35,7 +33,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
@@ -58,24 +58,6 @@ import org.rocksdb.WriteOptions;
  * Tx state storage implementation based on RocksDB.
  */
 public class TxStateRocksDbStorage implements TxStateStorage {
-    private static final VarHandle STATE;
-
-    private static final VarHandle REBALANCE_FUTURE;
-
-    static {
-        try {
-            STATE = MethodHandles.lookup().findVarHandle(TxStateRocksDbStorage.class, "state", StorageState.class);
-
-            REBALANCE_FUTURE = MethodHandles.lookup().findVarHandle(
-                    TxStateRocksDbStorage.class,
-                    "rebalanceFuture",
-                    CompletableFuture.class
-            );
-        } catch (ReflectiveOperationException e) {
-            throw new ExceptionInInitializerError(e);
-        }
-    }
-
     /** RocksDB database. */
     private final RocksDB db;
 
@@ -113,11 +95,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
     private volatile long persistedIndex;
 
     /** Current state of the storage. */
-    private volatile StorageState state = StorageState.RUNNABLE;
-
-    @Nullable
-    @SuppressWarnings("unused")
-    private volatile CompletableFuture<Void> rebalanceFuture;
+    private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
 
     /**
      * The constructor.
@@ -151,170 +129,149 @@ public class TxStateRocksDbStorage implements TxStateStorage {
      * @throws IgniteInternalException In case when the operation has failed.
      */
     public void start() {
-        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-
-        if (indexAndTermBytes != null) {
-            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
-
-            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
-                try (WriteBatch writeBatch = new WriteBatch()) {
-                    writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
-                    writeBatch.delete(lastAppliedIndexAndTermKey);
-
-                    db.write(writeOptions, writeBatch);
-                } catch (Exception e) {
-                    throw new IgniteInternalException(
-                            TX_STATE_STORAGE_REBALANCE_ERR,
-                            IgniteStringFormatter.format(
-                                    "Failed to clear storage for partition {} of table {}",
-                                    partitionId,
-                                    getTableName()
-                            ),
-                            e
-                    );
-                }
-            } else {
-                this.lastAppliedIndex = lastAppliedIndex;
-                persistedIndex = lastAppliedIndex;
+        busy(() -> {
+            byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+            if (indexAndTermBytes != null) {
+                long lastAppliedIndex = bytesToLong(indexAndTermBytes);
 
-                lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+                if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format("Failed to clear storage:", createStorageInfo()),
+                                e
+                        );
+                    }
+                } else {
+                    this.lastAppliedIndex = lastAppliedIndex;
+                    persistedIndex = lastAppliedIndex;
+
+                    lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+                }
             }
-        }
+
+            return null;
+        });
     }
 
     @Override
-    @Nullable
-    public TxMeta get(UUID txId) {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
-
-        try {
-            throwExceptionIfStorageInProgressOfRebalance();
+    public @Nullable TxMeta get(UUID txId) {
+        return busy(() -> {
+            try {
+                throwExceptionIfStorageInProgressOfRebalance();
 
-            byte[] txMetaBytes = db.get(txIdToKey(txId));
+                byte[] txMetaBytes = db.get(txIdToKey(txId));
 
-            return txMetaBytes == null ? null : fromBytes(txMetaBytes);
-        } catch (RocksDBException e) {
-            throw new IgniteInternalException(
-                    TX_STATE_STORAGE_ERR,
-                    "Failed to get a value from the transaction state storage, partition " + partitionId
-                            + " of table " + tableStorage.configuration().value().name(),
-                    e
-            );
-        } finally {
-            busyLock.leaveBusy();
-        }
+                return txMetaBytes == null ? null : fromBytes(txMetaBytes);
+            } catch (RocksDBException e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Failed to get a value from storage: [{}]", createStorageInfo()),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public void put(UUID txId, TxMeta txMeta) {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
-
-        try {
-            db.put(txIdToKey(txId), toBytes(txMeta));
-        } catch (RocksDBException e) {
-            throw new IgniteInternalException(
-                TX_STATE_STORAGE_ERR,
-                "Failed to put a value into the transaction state storage, partition " + partitionId
-                    + " of table " + tableStorage.configuration().value().name(),
-                e
-            );
-        } finally {
-            busyLock.leaveBusy();
-        }
+        busy(() -> {
+            try {
+                db.put(txIdToKey(txId), toBytes(txMeta));
+
+                return null;
+            } catch (RocksDBException e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Failed to put a value into storage: [{}]", createStorageInfo()),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex, long commandTerm) {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
-
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            byte[] txIdBytes = txIdToKey(txId);
+        return busy(() -> {
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                byte[] txIdBytes = txIdToKey(txId);
 
-            byte[] txMetaExistingBytes = db.get(readOptions, txIdToKey(txId));
+                byte[] txMetaExistingBytes = db.get(readOptions, txIdToKey(txId));
 
-            boolean result;
+                boolean result;
 
-            if (txMetaExistingBytes == null && txStateExpected == null) {
-                writeBatch.put(txIdBytes, toBytes(txMeta));
+                if (txMetaExistingBytes == null && txStateExpected == null) {
+                    writeBatch.put(txIdBytes, toBytes(txMeta));
 
-                result = true;
-            } else {
-                if (txMetaExistingBytes != null) {
-                    TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
+                    result = true;
+                } else {
+                    if (txMetaExistingBytes != null) {
+                        TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
 
-                    if (txMetaExisting.txState() == txStateExpected) {
-                        writeBatch.put(txIdBytes, toBytes(txMeta));
+                        if (txMetaExisting.txState() == txStateExpected) {
+                            writeBatch.put(txIdBytes, toBytes(txMeta));
 
-                        result = true;
+                            result = true;
+                        } else {
+                            result = txMetaExisting.txState() == txMeta.txState()
+                                    && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
+                        }
                     } else {
-                        result = txMetaExisting.txState() == txMeta.txState()
-                                && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
+                        result = false;
                     }
-                } else {
-                    result = false;
                 }
-            }
 
-            // If the store is in the process of rebalancing, then there is no need to update index lastAppliedIndex and lastAppliedTerm.
-            // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will have
-            // non-consistent storage. They will be updated to either #abortRebalance() or #finishRebalance(long, long).
-            if (state != StorageState.REBALANCE) {
-                writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
+                // If the store is in the process of rebalancing, then there is no need to update lastAppliedIndex and lastAppliedTerm.
+                // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
+                // have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long).
+                if (state.get() != StorageState.REBALANCE) {
+                    writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
 
-                lastAppliedIndex = commandIndex;
-                lastAppliedTerm = commandTerm;
-            }
+                    lastAppliedIndex = commandIndex;
+                    lastAppliedTerm = commandTerm;
+                }
 
-            db.write(writeOptions, writeBatch);
+                db.write(writeOptions, writeBatch);
 
-            return result;
-        } catch (RocksDBException e) {
-            throw new IgniteInternalException(
-                TX_STATE_STORAGE_ERR,
-                "Failed perform CAS operation over a value in transaction state storage, partition " + partitionId
-                        + " of table " + tableStorage.configuration().value().name(),
-                e
-            );
-        } finally {
-            busyLock.leaveBusy();
-        }
+                return result;
+            } catch (RocksDBException e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Failed perform CAS operation over a value in storage: [{}]", createStorageInfo()),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public void remove(UUID txId) {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
+        busy(() -> {
+            try {
+                throwExceptionIfStorageInProgressOfRebalance();
 
-        try {
-            throwExceptionIfStorageInProgressOfRebalance();
+                db.delete(txIdToKey(txId));
 
-            db.delete(txIdToKey(txId));
-        } catch (RocksDBException e) {
-            throw new IgniteInternalException(
-                TX_STATE_STORAGE_ERR,
-                "Failed to remove a value from the transaction state storage, partition " + partitionId
-                    + " of table " + tableStorage.configuration().value().name(),
-                e
-            );
-        } finally {
-            busyLock.leaveBusy();
-        }
+                return null;
+            } catch (RocksDBException e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Failed to remove a value from storage: [{}]", createStorageInfo()),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
-
-        try {
+        return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance();
 
             byte[] lowerBound = ByteBuffer.allocate(Short.BYTES + 1).putShort((short) partitionId).put((byte) 0).array();
@@ -336,7 +293,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
                 throw e;
             }
 
-            return new BusyRocksIteratorAdapter<>(busyLock, rocksIterator) {
+            return new RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>>(rocksIterator) {
                 @Override
                 protected IgniteBiTuple<UUID, TxMeta> decodeEntry(byte[] keyBytes, byte[] valueBytes) {
                     UUID key = keyToTxId(keyBytes);
@@ -346,13 +303,21 @@ public class TxStateRocksDbStorage implements TxStateStorage {
                 }
 
                 @Override
-                protected void handleBusyFail() {
-                    throwExceptionIfStorageClosedOrRebalance();
+                public boolean hasNext() {
+                    return busy(() -> {
+                        throwExceptionIfStorageInProgressOfRebalance();
+
+                        return super.hasNext();
+                    });
                 }
 
                 @Override
-                protected void handeBusySuccess() {
-                    throwExceptionIfStorageInProgressOfRebalance();
+                public IgniteBiTuple<UUID, TxMeta> next() {
+                    return busy(() -> {
+                        throwExceptionIfStorageInProgressOfRebalance();
+
+                        return super.next();
+                    });
                 }
 
                 @Override
@@ -362,14 +327,12 @@ public class TxStateRocksDbStorage implements TxStateStorage {
                     super.close();
                 }
             };
-        } finally {
-            busyLock.leaveBusy();
-        }
+        });
     }
 
     @Override
     public CompletableFuture<Void> flush() {
-        return tableStorage.awaitFlush(true);
+        return busy(() -> tableStorage.awaitFlush(true));
     }
 
     @Override
@@ -384,27 +347,24 @@ public class TxStateRocksDbStorage implements TxStateStorage {
 
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
-
-        try {
-            throwExceptionIfStorageInProgressOfRebalance();
+        busy(() -> {
+            try {
+                throwExceptionIfStorageInProgressOfRebalance();
 
-            db.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
+                db.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
 
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-        } catch (RocksDBException e) {
-            throw new IgniteInternalException(
-                    TX_STATE_STORAGE_ERR,
-                    "Failed to write applied index value to transaction state storage, partition " + partitionId
-                            + " of table " + tableStorage.configuration().value().name(),
-                    e
-            );
-        } finally {
-            busyLock.leaveBusy();
-        }
+                this.lastAppliedIndex = lastAppliedIndex;
+                this.lastAppliedTerm = lastAppliedTerm;
+
+                return null;
+            } catch (RocksDBException e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Failed to write applied index value to storage: [{}]", createStorageInfo()),
+                        e
+                );
+            }
+        });
     }
 
     private static byte[] indexAndTermToBytes(long lastAppliedIndex, long lastAppliedTerm) {
@@ -455,8 +415,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         } catch (RocksDBException e) {
             throw new IgniteInternalException(
                     TX_STATE_STORAGE_ERR,
-                    "Failed to read applied term value from transaction state storage, partition " + partitionId
-                            + " of table " + tableStorage.configuration().value().name(),
+                    IgniteStringFormatter.format("Failed to read applied term value from storage: [{}]", createStorageInfo()),
                     e
             );
         }
@@ -477,7 +436,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         } catch (Exception e) {
             throw new IgniteInternalException(
                     TX_STATE_STORAGE_ERR,
-                    "Failed to destroy partition " + partitionId + " of table " + tableStorage.configuration().name(),
+                    IgniteStringFormatter.format("Failed to destroy storage: [{}]", createStorageInfo()),
                     e
             );
         }
@@ -517,20 +476,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
 
     @Override
     public CompletableFuture<Void> startRebalance() {
-        CompletableFuture<Void> rebalanceFuture = new CompletableFuture<>();
-
-        if (!REBALANCE_FUTURE.compareAndSet(this, null, rebalanceFuture)) {
-            throw createStorageInProgressOfRebalanceException();
-        }
-
-        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
-            IgniteInternalException exception = createExceptionIfStorageClosedOrRebalance();
-
-            this.rebalanceFuture = null;
-
-            rebalanceFuture.completeExceptionally(exception);
-
-            throw exception;
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageState();
         }
 
         busyLock.block();
@@ -545,23 +492,13 @@ public class TxStateRocksDbStorage implements TxStateStorage {
             lastAppliedTerm = REBALANCE_IN_PROGRESS;
             persistedIndex = REBALANCE_IN_PROGRESS;
 
-            rebalanceFuture.complete(null);
-
-            return rebalanceFuture;
+            return completedFuture(null);
         } catch (Exception e) {
-            IgniteInternalException exception = new IgniteInternalException(
+            throw new IgniteInternalException(
                     TX_STATE_STORAGE_REBALANCE_ERR,
-                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    IgniteStringFormatter.format("Failed to start rebalance: [{}]", createStorageInfo()),
                     e
             );
-
-            this.rebalanceFuture = null;
-
-            state = StorageState.RUNNABLE;
-
-            rebalanceFuture.completeExceptionally(exception);
-
-            throw exception;
         } finally {
             busyLock.unblock();
         }
@@ -569,71 +506,60 @@ public class TxStateRocksDbStorage implements TxStateStorage {
 
     @Override
     public CompletableFuture<Void> abortRebalance() {
-        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
-
-        if (rebalanceFuture == null) {
+        if (state.get() != StorageState.REBALANCE) {
             return completedFuture(null);
         }
 
-        return rebalanceFuture
-                .thenAccept(unused -> {
-                    try (WriteBatch writeBatch = new WriteBatch()) {
-                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
-                        writeBatch.delete(lastAppliedIndexAndTermKey);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.delete(lastAppliedIndexAndTermKey);
 
-                        db.write(writeOptions, writeBatch);
+            db.write(writeOptions, writeBatch);
 
-                        lastAppliedIndex = 0;
-                        lastAppliedTerm = 0;
-                        persistedIndex = 0;
+            lastAppliedIndex = 0;
+            lastAppliedTerm = 0;
+            persistedIndex = 0;
 
-                        state = StorageState.RUNNABLE;
-                    } catch (Exception e) {
-                        throw new IgniteInternalException(
-                                TX_STATE_STORAGE_REBALANCE_ERR,
-                                IgniteStringFormatter.format(
-                                        "Failed to clear storage for partition {} of table {}",
-                                        partitionId,
-                                        getTableName()
-                                ),
-                                e
-                        );
-                    }
-                });
+            state.set(StorageState.RUNNABLE);
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to abort rebalance: [{}]", createStorageInfo()),
+                    e
+            );
+        }
+
+        return completedFuture(null);
     }
 
     @Override
     public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
-        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
-
-        if (rebalanceFuture == null) {
-            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Rebalancing has not started");
+        if (state.get() != StorageState.REBALANCE) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Rebalancing has not started: [{}]", createStorageInfo())
+            );
         }
 
-        return rebalanceFuture
-                .thenAccept(unused -> {
-                    try (WriteBatch writeBatch = new WriteBatch()) {
-                        writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
 
-                        db.write(writeOptions, writeBatch);
+            db.write(writeOptions, writeBatch);
 
-                        this.lastAppliedIndex = lastAppliedIndex;
-                        this.lastAppliedTerm = lastAppliedTerm;
-                        this.persistedIndex = lastAppliedIndex;
+            this.lastAppliedIndex = lastAppliedIndex;
+            this.lastAppliedTerm = lastAppliedTerm;
+            this.persistedIndex = lastAppliedIndex;
 
-                        state = StorageState.RUNNABLE;
-                    } catch (Exception e) {
-                        throw new IgniteInternalException(
-                                TX_STATE_STORAGE_REBALANCE_ERR,
-                                IgniteStringFormatter.format(
-                                        "Failed to finish rebalance for partition {} of table {}",
-                                        partitionId,
-                                        getTableName()
-                                ),
-                                e
-                        );
-                    }
-                });
+            state.set(StorageState.RUNNABLE);
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to finish rebalance: [{}]", createStorageInfo()),
+                    e
+            );
+        }
+
+        return completedFuture(null);
     }
 
     /**
@@ -642,8 +568,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
      * @return {@code True} if the storage was successfully closed, otherwise the storage has already been closed.
      */
     private boolean tryToCloseStorageAndResources() {
-        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.CLOSED)) {
-            StorageState state = this.state;
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
 
             assert state == StorageState.CLOSED : state;
 
@@ -659,43 +585,52 @@ public class TxStateRocksDbStorage implements TxStateStorage {
         return true;
     }
 
-    private void throwExceptionIfStorageClosedOrRebalance() {
-        throw createExceptionIfStorageClosedOrRebalance();
-    }
-
     private void throwExceptionIfStorageInProgressOfRebalance() {
-        if (state == StorageState.REBALANCE) {
+        if (state.get() == StorageState.REBALANCE) {
             throw createStorageInProgressOfRebalanceException();
         }
     }
 
-    private IgniteInternalException createStorageClosedException() {
-        return new IgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, "Transaction state storage is stopped");
-    }
-
     private IgniteInternalException createStorageInProgressOfRebalanceException() {
-        return new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Storage is in the process of being rebalanced");
+        return new IgniteInternalException(
+                TX_STATE_STORAGE_REBALANCE_ERR,
+                IgniteStringFormatter.format("Storage is in the process of rebalance: [{}]", createStorageInfo())
+        );
     }
 
-    private IgniteInternalException createUnexpectedStorageStateException(StorageState state) {
-        return new IgniteInternalException(TX_STATE_STORAGE_ERR, "Unexpected state: " + state);
-    }
-
-    private IgniteInternalException createExceptionIfStorageClosedOrRebalance() {
-        StorageState state = this.state;
+    private void throwExceptionDependingOnStorageState() {
+        StorageState state = this.state.get();
 
         switch (state) {
             case CLOSED:
-                return createStorageClosedException();
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_STOPPED_ERR,
+                        IgniteStringFormatter.format("Transaction state storage is stopped: [{}]", createStorageInfo())
+                );
             case REBALANCE:
-                return createStorageInProgressOfRebalanceException();
+                throw createStorageInProgressOfRebalanceException();
             default:
-                return createUnexpectedStorageStateException(state);
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Unexpected state: [{}, state={}]", createStorageInfo(), state)
+                );
         }
     }
 
-    private String getTableName() {
-        return tableStorage.configuration().name().value();
+    private String createStorageInfo() {
+        return "table=" + tableStorage.configuration().name().value() + ", partitionId=" + partitionId;
+    }
+
+    private <V> V busy(Supplier<V> supplier) {
+        if (!busyLock.enterBusy()) {
+            throwExceptionDependingOnStorageState();
+        }
+
+        try {
+            return supplier.get();
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
index dcd239f043..1b62eea05d 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.tx.storage.state.rocksdb;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.tx.storage.state.TxStateStorage.REBALANCE_IN_PROGRESS;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 
@@ -66,8 +67,6 @@ public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
     void testRestartStorageInProgressOfRebalance() throws Exception {
         TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
 
-        storage.startRebalance().get(1, TimeUnit.SECONDS);
-
         fillStorage(
                 storage,
                 List.of(randomTxMetaTuple(1, UUID.randomUUID()), randomTxMetaTuple(1, UUID.randomUUID()))
@@ -75,7 +74,8 @@ public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
 
         storage.flush().get(10, TimeUnit.SECONDS);
 
-        storage.finishRebalance(10, 15).get(1, TimeUnit.SECONDS);
+        // We emulate the situation that the rebalancing did not have time to end.
+        storage.lastApplied(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
 
         tableStorage.stop();
 
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index 091c41c3d7..3580d589e4 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -288,7 +288,7 @@ public abstract class AbstractTxStateStorageTest {
     }
 
     @Test
-    public void testSuccessRebalance() throws Exception {
+    public void testSuccessRebalance() {
         TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
 
         // We can't finish rebalance that we haven't started.