You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "tkalkirill (via GitHub)" <gi...@apache.org> on 2023/01/25 10:25:45 UTC

[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1574: IGNITE-18629 Slightly improve busyLock usage in TxStateRocksDbStorage

tkalkirill commented on code in PR #1574:
URL: https://github.com/apache/ignite-3/pull/1574#discussion_r1086457048


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -151,170 +129,149 @@ public class TxStateRocksDbStorage implements TxStateStorage {
      * @throws IgniteInternalException In case when the operation has failed.
      */
     public void start() {
-        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-
-        if (indexAndTermBytes != null) {
-            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
-
-            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
-                try (WriteBatch writeBatch = new WriteBatch()) {
-                    writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
-                    writeBatch.delete(lastAppliedIndexAndTermKey);
-
-                    db.write(writeOptions, writeBatch);
-                } catch (Exception e) {
-                    throw new IgniteInternalException(
-                            TX_STATE_STORAGE_REBALANCE_ERR,
-                            IgniteStringFormatter.format(
-                                    "Failed to clear storage for partition {} of table {}",
-                                    partitionId,
-                                    getTableName()
-                            ),
-                            e
-                    );
-                }
-            } else {
-                this.lastAppliedIndex = lastAppliedIndex;
-                persistedIndex = lastAppliedIndex;
+        busy(() -> {
+            byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+            if (indexAndTermBytes != null) {
+                long lastAppliedIndex = bytesToLong(indexAndTermBytes);
 
-                lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+                if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format("Failed to clear storage:", createStorageInfo()),
+                                e
+                        );
+                    }
+                } else {
+                    this.lastAppliedIndex = lastAppliedIndex;
+                    persistedIndex = lastAppliedIndex;
+
+                    lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+                }
             }
-        }
+
+            return null;
+        });
     }
 
     @Override
-    @Nullable
-    public TxMeta get(UUID txId) {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
-
-        try {
-            throwExceptionIfStorageInProgressOfRebalance();
+    public @Nullable TxMeta get(UUID txId) {
+        return busy(() -> {
+            try {
+                throwExceptionIfStorageInProgressOfRebalance();
 
-            byte[] txMetaBytes = db.get(txIdToKey(txId));
+                byte[] txMetaBytes = db.get(txIdToKey(txId));
 
-            return txMetaBytes == null ? null : fromBytes(txMetaBytes);
-        } catch (RocksDBException e) {
-            throw new IgniteInternalException(
-                    TX_STATE_STORAGE_ERR,
-                    "Failed to get a value from the transaction state storage, partition " + partitionId
-                            + " of table " + tableStorage.configuration().value().name(),
-                    e
-            );
-        } finally {
-            busyLock.leaveBusy();
-        }
+                return txMetaBytes == null ? null : fromBytes(txMetaBytes);
+            } catch (RocksDBException e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Failed to get a value from storage: [{}]", createStorageInfo()),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public void put(UUID txId, TxMeta txMeta) {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
-
-        try {
-            db.put(txIdToKey(txId), toBytes(txMeta));
-        } catch (RocksDBException e) {
-            throw new IgniteInternalException(
-                TX_STATE_STORAGE_ERR,
-                "Failed to put a value into the transaction state storage, partition " + partitionId
-                    + " of table " + tableStorage.configuration().value().name(),
-                e
-            );
-        } finally {
-            busyLock.leaveBusy();
-        }
+        busy(() -> {
+            try {
+                db.put(txIdToKey(txId), toBytes(txMeta));
+
+                return null;
+            } catch (RocksDBException e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_ERR,
+                        IgniteStringFormatter.format("Failed to put a value into storage: [{}]", createStorageInfo()),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex, long commandTerm) {
-        if (!busyLock.enterBusy()) {
-            throwExceptionIfStorageClosedOrRebalance();
-        }
-
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            byte[] txIdBytes = txIdToKey(txId);
+        return busy(() -> {
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                byte[] txIdBytes = txIdToKey(txId);
 
-            byte[] txMetaExistingBytes = db.get(readOptions, txIdToKey(txId));
+                byte[] txMetaExistingBytes = db.get(readOptions, txIdToKey(txId));
 
-            boolean result;
+                boolean result;
 
-            if (txMetaExistingBytes == null && txStateExpected == null) {
-                writeBatch.put(txIdBytes, toBytes(txMeta));
+                if (txMetaExistingBytes == null && txStateExpected == null) {
+                    writeBatch.put(txIdBytes, toBytes(txMeta));
 
-                result = true;
-            } else {
-                if (txMetaExistingBytes != null) {
-                    TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
+                    result = true;
+                } else {
+                    if (txMetaExistingBytes != null) {
+                        TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
 
-                    if (txMetaExisting.txState() == txStateExpected) {
-                        writeBatch.put(txIdBytes, toBytes(txMeta));
+                        if (txMetaExisting.txState() == txStateExpected) {
+                            writeBatch.put(txIdBytes, toBytes(txMeta));
 
-                        result = true;
+                            result = true;
+                        } else {
+                            result = txMetaExisting.txState() == txMeta.txState()
+                                    && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
+                        }
                     } else {
-                        result = txMetaExisting.txState() == txMeta.txState()
-                                && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
+                        result = false;
                     }
-                } else {
-                    result = false;
                 }
-            }
 
-            // If the store is in the process of rebalancing, then there is no need to update index lastAppliedIndex and lastAppliedTerm.
-            // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will have
-            // non-consistent storage. They will be updated to either #abortRebalance() or #finishRebalance(long, long).
-            if (state != StorageState.REBALANCE) {
-                writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
+                // If the store is in the process of rebalancing, then there is no need to update lastAppliedIndex and lastAppliedTerm.
+                // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
+                // have non-consistent storage. They will be updated to either #abortRebalance() or #finishRebalance(long, long).

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org