You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/25 11:05:53 UTC
[ignite-3] branch main updated: IGNITE-18629 Slightly improve busyLock usage in TxStateRocksDbStorage (#1574)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 01f30d0d0d IGNITE-18629 Slightly improve busyLock usage in TxStateRocksDbStorage (#1574)
01f30d0d0d is described below
commit 01f30d0d0dfaec356cbdf4cf5c02165f75af8e83
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Jan 25 14:05:48 2023 +0300
IGNITE-18629 Slightly improve busyLock usage in TxStateRocksDbStorage (#1574)
---
.../raft/snapshot/PartitionAccessImpl.java | 2 +-
.../state/rocksdb/TxStateRocksDbStorage.java | 501 +++++++++------------
.../state/rocksdb/RocksDbTxStateStorageTest.java | 6 +-
.../storage/state/AbstractTxStateStorageTest.java | 2 +-
4 files changed, 223 insertions(+), 288 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index 4748a541da..431a8c413c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -167,7 +167,7 @@ public class PartitionAccessImpl implements PartitionAccess {
@Override
public CompletableFuture<Void> startRebalance() {
- // TODO: IGNITE-18619 Fix this bullshit, we should have already waited for the indexes to be created
+ // TODO: IGNITE-18619 Fix it, we should have already waited for the indexes to be created
indexes.get();
TxStateStorage txStateStorage = getTxStateStorage(partitionId());
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 6d3200f278..5aa7ddfa37 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -26,8 +26,6 @@ import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_E
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_REBALANCE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_STOPPED_ERR;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;
@@ -35,7 +33,9 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
@@ -58,24 +58,6 @@ import org.rocksdb.WriteOptions;
* Tx state storage implementation based on RocksDB.
*/
public class TxStateRocksDbStorage implements TxStateStorage {
- private static final VarHandle STATE;
-
- private static final VarHandle REBALANCE_FUTURE;
-
- static {
- try {
- STATE = MethodHandles.lookup().findVarHandle(TxStateRocksDbStorage.class, "state", StorageState.class);
-
- REBALANCE_FUTURE = MethodHandles.lookup().findVarHandle(
- TxStateRocksDbStorage.class,
- "rebalanceFuture",
- CompletableFuture.class
- );
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
/** RocksDB database. */
private final RocksDB db;
@@ -113,11 +95,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
private volatile long persistedIndex;
/** Current state of the storage. */
- private volatile StorageState state = StorageState.RUNNABLE;
-
- @Nullable
- @SuppressWarnings("unused")
- private volatile CompletableFuture<Void> rebalanceFuture;
+ private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
/**
* The constructor.
@@ -151,170 +129,149 @@ public class TxStateRocksDbStorage implements TxStateStorage {
* @throws IgniteInternalException In case when the operation has failed.
*/
public void start() {
- byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-
- if (indexAndTermBytes != null) {
- long lastAppliedIndex = bytesToLong(indexAndTermBytes);
-
- if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
- try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
- writeBatch.delete(lastAppliedIndexAndTermKey);
-
- db.write(writeOptions, writeBatch);
- } catch (Exception e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_REBALANCE_ERR,
- IgniteStringFormatter.format(
- "Failed to clear storage for partition {} of table {}",
- partitionId,
- getTableName()
- ),
- e
- );
- }
- } else {
- this.lastAppliedIndex = lastAppliedIndex;
- persistedIndex = lastAppliedIndex;
+ busy(() -> {
+ byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+ if (indexAndTermBytes != null) {
+ long lastAppliedIndex = bytesToLong(indexAndTermBytes);
- lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+ if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+ writeBatch.delete(lastAppliedIndexAndTermKey);
+
+ db.write(writeOptions, writeBatch);
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Failed to clear storage:", createStorageInfo()),
+ e
+ );
+ }
+ } else {
+ this.lastAppliedIndex = lastAppliedIndex;
+ persistedIndex = lastAppliedIndex;
+
+ lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+ }
}
- }
+
+ return null;
+ });
}
@Override
- @Nullable
- public TxMeta get(UUID txId) {
- if (!busyLock.enterBusy()) {
- throwExceptionIfStorageClosedOrRebalance();
- }
-
- try {
- throwExceptionIfStorageInProgressOfRebalance();
+ public @Nullable TxMeta get(UUID txId) {
+ return busy(() -> {
+ try {
+ throwExceptionIfStorageInProgressOfRebalance();
- byte[] txMetaBytes = db.get(txIdToKey(txId));
+ byte[] txMetaBytes = db.get(txIdToKey(txId));
- return txMetaBytes == null ? null : fromBytes(txMetaBytes);
- } catch (RocksDBException e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_ERR,
- "Failed to get a value from the transaction state storage, partition " + partitionId
- + " of table " + tableStorage.configuration().value().name(),
- e
- );
- } finally {
- busyLock.leaveBusy();
- }
+ return txMetaBytes == null ? null : fromBytes(txMetaBytes);
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ IgniteStringFormatter.format("Failed to get a value from storage: [{}]", createStorageInfo()),
+ e
+ );
+ }
+ });
}
@Override
public void put(UUID txId, TxMeta txMeta) {
- if (!busyLock.enterBusy()) {
- throwExceptionIfStorageClosedOrRebalance();
- }
-
- try {
- db.put(txIdToKey(txId), toBytes(txMeta));
- } catch (RocksDBException e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_ERR,
- "Failed to put a value into the transaction state storage, partition " + partitionId
- + " of table " + tableStorage.configuration().value().name(),
- e
- );
- } finally {
- busyLock.leaveBusy();
- }
+ busy(() -> {
+ try {
+ db.put(txIdToKey(txId), toBytes(txMeta));
+
+ return null;
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ IgniteStringFormatter.format("Failed to put a value into storage: [{}]", createStorageInfo()),
+ e
+ );
+ }
+ });
}
@Override
public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex, long commandTerm) {
- if (!busyLock.enterBusy()) {
- throwExceptionIfStorageClosedOrRebalance();
- }
-
- try (WriteBatch writeBatch = new WriteBatch()) {
- byte[] txIdBytes = txIdToKey(txId);
+ return busy(() -> {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ byte[] txIdBytes = txIdToKey(txId);
- byte[] txMetaExistingBytes = db.get(readOptions, txIdToKey(txId));
+ byte[] txMetaExistingBytes = db.get(readOptions, txIdToKey(txId));
- boolean result;
+ boolean result;
- if (txMetaExistingBytes == null && txStateExpected == null) {
- writeBatch.put(txIdBytes, toBytes(txMeta));
+ if (txMetaExistingBytes == null && txStateExpected == null) {
+ writeBatch.put(txIdBytes, toBytes(txMeta));
- result = true;
- } else {
- if (txMetaExistingBytes != null) {
- TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
+ result = true;
+ } else {
+ if (txMetaExistingBytes != null) {
+ TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
- if (txMetaExisting.txState() == txStateExpected) {
- writeBatch.put(txIdBytes, toBytes(txMeta));
+ if (txMetaExisting.txState() == txStateExpected) {
+ writeBatch.put(txIdBytes, toBytes(txMeta));
- result = true;
+ result = true;
+ } else {
+ result = txMetaExisting.txState() == txMeta.txState()
+ && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
+ }
} else {
- result = txMetaExisting.txState() == txMeta.txState()
- && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
+ result = false;
}
- } else {
- result = false;
}
- }
- // If the store is in the process of rebalancing, then there is no need to update index lastAppliedIndex and lastAppliedTerm.
- // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will have
- // non-consistent storage. They will be updated to either #abortRebalance() or #finishRebalance(long, long).
- if (state != StorageState.REBALANCE) {
- writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
+ // If the store is in the process of rebalancing, then there is no need to update lastAppliedIndex and lastAppliedTerm.
+ // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
+ // have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long).
+ if (state.get() != StorageState.REBALANCE) {
+ writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
- lastAppliedIndex = commandIndex;
- lastAppliedTerm = commandTerm;
- }
+ lastAppliedIndex = commandIndex;
+ lastAppliedTerm = commandTerm;
+ }
- db.write(writeOptions, writeBatch);
+ db.write(writeOptions, writeBatch);
- return result;
- } catch (RocksDBException e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_ERR,
- "Failed perform CAS operation over a value in transaction state storage, partition " + partitionId
- + " of table " + tableStorage.configuration().value().name(),
- e
- );
- } finally {
- busyLock.leaveBusy();
- }
+ return result;
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ IgniteStringFormatter.format("Failed perform CAS operation over a value in storage: [{}]", createStorageInfo()),
+ e
+ );
+ }
+ });
}
@Override
public void remove(UUID txId) {
- if (!busyLock.enterBusy()) {
- throwExceptionIfStorageClosedOrRebalance();
- }
+ busy(() -> {
+ try {
+ throwExceptionIfStorageInProgressOfRebalance();
- try {
- throwExceptionIfStorageInProgressOfRebalance();
+ db.delete(txIdToKey(txId));
- db.delete(txIdToKey(txId));
- } catch (RocksDBException e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_ERR,
- "Failed to remove a value from the transaction state storage, partition " + partitionId
- + " of table " + tableStorage.configuration().value().name(),
- e
- );
- } finally {
- busyLock.leaveBusy();
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ IgniteStringFormatter.format("Failed to remove a value from storage: [{}]", createStorageInfo()),
+ e
+ );
+ }
+ });
}
@Override
public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
- if (!busyLock.enterBusy()) {
- throwExceptionIfStorageClosedOrRebalance();
- }
-
- try {
+ return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance();
byte[] lowerBound = ByteBuffer.allocate(Short.BYTES + 1).putShort((short) partitionId).put((byte) 0).array();
@@ -336,7 +293,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
throw e;
}
- return new BusyRocksIteratorAdapter<>(busyLock, rocksIterator) {
+ return new RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>>(rocksIterator) {
@Override
protected IgniteBiTuple<UUID, TxMeta> decodeEntry(byte[] keyBytes, byte[] valueBytes) {
UUID key = keyToTxId(keyBytes);
@@ -346,13 +303,21 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
@Override
- protected void handleBusyFail() {
- throwExceptionIfStorageClosedOrRebalance();
+ public boolean hasNext() {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance();
+
+ return super.hasNext();
+ });
}
@Override
- protected void handeBusySuccess() {
- throwExceptionIfStorageInProgressOfRebalance();
+ public IgniteBiTuple<UUID, TxMeta> next() {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance();
+
+ return super.next();
+ });
}
@Override
@@ -362,14 +327,12 @@ public class TxStateRocksDbStorage implements TxStateStorage {
super.close();
}
};
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
@Override
public CompletableFuture<Void> flush() {
- return tableStorage.awaitFlush(true);
+ return busy(() -> tableStorage.awaitFlush(true));
}
@Override
@@ -384,27 +347,24 @@ public class TxStateRocksDbStorage implements TxStateStorage {
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) {
- if (!busyLock.enterBusy()) {
- throwExceptionIfStorageClosedOrRebalance();
- }
-
- try {
- throwExceptionIfStorageInProgressOfRebalance();
+ busy(() -> {
+ try {
+ throwExceptionIfStorageInProgressOfRebalance();
- db.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
+ db.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
- this.lastAppliedIndex = lastAppliedIndex;
- this.lastAppliedTerm = lastAppliedTerm;
- } catch (RocksDBException e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_ERR,
- "Failed to write applied index value to transaction state storage, partition " + partitionId
- + " of table " + tableStorage.configuration().value().name(),
- e
- );
- } finally {
- busyLock.leaveBusy();
- }
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+
+ return null;
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ IgniteStringFormatter.format("Failed to write applied index value to storage: [{}]", createStorageInfo()),
+ e
+ );
+ }
+ });
}
private static byte[] indexAndTermToBytes(long lastAppliedIndex, long lastAppliedTerm) {
@@ -455,8 +415,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
- "Failed to read applied term value from transaction state storage, partition " + partitionId
- + " of table " + tableStorage.configuration().value().name(),
+ IgniteStringFormatter.format("Failed to read applied term value from storage: [{}]", createStorageInfo()),
e
);
}
@@ -477,7 +436,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
} catch (Exception e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
- "Failed to destroy partition " + partitionId + " of table " + tableStorage.configuration().name(),
+ IgniteStringFormatter.format("Failed to destroy storage: [{}]", createStorageInfo()),
e
);
}
@@ -517,20 +476,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
@Override
public CompletableFuture<Void> startRebalance() {
- CompletableFuture<Void> rebalanceFuture = new CompletableFuture<>();
-
- if (!REBALANCE_FUTURE.compareAndSet(this, null, rebalanceFuture)) {
- throw createStorageInProgressOfRebalanceException();
- }
-
- if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
- IgniteInternalException exception = createExceptionIfStorageClosedOrRebalance();
-
- this.rebalanceFuture = null;
-
- rebalanceFuture.completeExceptionally(exception);
-
- throw exception;
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+ throwExceptionDependingOnStorageState();
}
busyLock.block();
@@ -545,23 +492,13 @@ public class TxStateRocksDbStorage implements TxStateStorage {
lastAppliedTerm = REBALANCE_IN_PROGRESS;
persistedIndex = REBALANCE_IN_PROGRESS;
- rebalanceFuture.complete(null);
-
- return rebalanceFuture;
+ return completedFuture(null);
} catch (Exception e) {
- IgniteInternalException exception = new IgniteInternalException(
+ throw new IgniteInternalException(
TX_STATE_STORAGE_REBALANCE_ERR,
- IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+ IgniteStringFormatter.format("Failed to start rebalance: [{}]", createStorageInfo()),
e
);
-
- this.rebalanceFuture = null;
-
- state = StorageState.RUNNABLE;
-
- rebalanceFuture.completeExceptionally(exception);
-
- throw exception;
} finally {
busyLock.unblock();
}
@@ -569,71 +506,60 @@ public class TxStateRocksDbStorage implements TxStateStorage {
@Override
public CompletableFuture<Void> abortRebalance() {
- CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
-
- if (rebalanceFuture == null) {
+ if (state.get() != StorageState.REBALANCE) {
return completedFuture(null);
}
- return rebalanceFuture
- .thenAccept(unused -> {
- try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
- writeBatch.delete(lastAppliedIndexAndTermKey);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+ writeBatch.delete(lastAppliedIndexAndTermKey);
- db.write(writeOptions, writeBatch);
+ db.write(writeOptions, writeBatch);
- lastAppliedIndex = 0;
- lastAppliedTerm = 0;
- persistedIndex = 0;
+ lastAppliedIndex = 0;
+ lastAppliedTerm = 0;
+ persistedIndex = 0;
- state = StorageState.RUNNABLE;
- } catch (Exception e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_REBALANCE_ERR,
- IgniteStringFormatter.format(
- "Failed to clear storage for partition {} of table {}",
- partitionId,
- getTableName()
- ),
- e
- );
- }
- });
+ state.set(StorageState.RUNNABLE);
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Failed to abort rebalance: [{}]", createStorageInfo()),
+ e
+ );
+ }
+
+ return completedFuture(null);
}
@Override
public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
- CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
-
- if (rebalanceFuture == null) {
- throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Rebalancing has not started");
+ if (state.get() != StorageState.REBALANCE) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Rebalancing has not started: [{}]", createStorageInfo())
+ );
}
- return rebalanceFuture
- .thenAccept(unused -> {
- try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
- db.write(writeOptions, writeBatch);
+ db.write(writeOptions, writeBatch);
- this.lastAppliedIndex = lastAppliedIndex;
- this.lastAppliedTerm = lastAppliedTerm;
- this.persistedIndex = lastAppliedIndex;
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ this.persistedIndex = lastAppliedIndex;
- state = StorageState.RUNNABLE;
- } catch (Exception e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_REBALANCE_ERR,
- IgniteStringFormatter.format(
- "Failed to finish rebalance for partition {} of table {}",
- partitionId,
- getTableName()
- ),
- e
- );
- }
- });
+ state.set(StorageState.RUNNABLE);
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Failed to finish rebalance: [{}]", createStorageInfo()),
+ e
+ );
+ }
+
+ return completedFuture(null);
}
/**
@@ -642,8 +568,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
* @return {@code True} if the storage was successfully closed, otherwise the storage has already been closed.
*/
private boolean tryToCloseStorageAndResources() {
- if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.CLOSED)) {
- StorageState state = this.state;
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+ StorageState state = this.state.get();
assert state == StorageState.CLOSED : state;
@@ -659,43 +585,52 @@ public class TxStateRocksDbStorage implements TxStateStorage {
return true;
}
- private void throwExceptionIfStorageClosedOrRebalance() {
- throw createExceptionIfStorageClosedOrRebalance();
- }
-
private void throwExceptionIfStorageInProgressOfRebalance() {
- if (state == StorageState.REBALANCE) {
+ if (state.get() == StorageState.REBALANCE) {
throw createStorageInProgressOfRebalanceException();
}
}
- private IgniteInternalException createStorageClosedException() {
- return new IgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, "Transaction state storage is stopped");
- }
-
private IgniteInternalException createStorageInProgressOfRebalanceException() {
- return new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Storage is in the process of being rebalanced");
+ return new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Storage is in the process of rebalance: [{}]", createStorageInfo())
+ );
}
- private IgniteInternalException createUnexpectedStorageStateException(StorageState state) {
- return new IgniteInternalException(TX_STATE_STORAGE_ERR, "Unexpected state: " + state);
- }
-
- private IgniteInternalException createExceptionIfStorageClosedOrRebalance() {
- StorageState state = this.state;
+ private void throwExceptionDependingOnStorageState() {
+ StorageState state = this.state.get();
switch (state) {
case CLOSED:
- return createStorageClosedException();
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_STOPPED_ERR,
+ IgniteStringFormatter.format("Transaction state storage is stopped: [{}]", createStorageInfo())
+ );
case REBALANCE:
- return createStorageInProgressOfRebalanceException();
+ throw createStorageInProgressOfRebalanceException();
default:
- return createUnexpectedStorageStateException(state);
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ IgniteStringFormatter.format("Unexpected state: [{}, state={}]", createStorageInfo(), state)
+ );
}
}
- private String getTableName() {
- return tableStorage.configuration().name().value();
+ private String createStorageInfo() {
+ return "table=" + tableStorage.configuration().name().value() + ", partitionId=" + partitionId;
+ }
+
+ private <V> V busy(Supplier<V> supplier) {
+ if (!busyLock.enterBusy()) {
+ throwExceptionDependingOnStorageState();
+ }
+
+ try {
+ return supplier.get();
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
index dcd239f043..1b62eea05d 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.tx.storage.state.TxStateStorage.REBALANCE_IN_PROGRESS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -66,8 +67,6 @@ public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
void testRestartStorageInProgressOfRebalance() throws Exception {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
- storage.startRebalance().get(1, TimeUnit.SECONDS);
-
fillStorage(
storage,
List.of(randomTxMetaTuple(1, UUID.randomUUID()), randomTxMetaTuple(1, UUID.randomUUID()))
@@ -75,7 +74,8 @@ public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
storage.flush().get(10, TimeUnit.SECONDS);
- storage.finishRebalance(10, 15).get(1, TimeUnit.SECONDS);
+ // We emulate the situation that the rebalancing did not have time to end.
+ storage.lastApplied(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
tableStorage.stop();
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index 091c41c3d7..3580d589e4 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -288,7 +288,7 @@ public abstract class AbstractTxStateStorageTest {
}
@Test
- public void testSuccessRebalance() throws Exception {
+ public void testSuccessRebalance() {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
// We can't finish rebalance that we haven't started.