You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/27 15:26:06 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1057730028


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -93,8 +115,12 @@ public class TxStateRocksDbStorage implements TxStateStorage {
     /** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
     private volatile long persistedIndex;
 
-    /** Database key for the last applied index+term. */
-    private final byte[] lastAppliedIndexAndTermKey;
+    /** Current state of the storage. */
+    private volatile StorageState state = StorageState.RUNNABLE;
+
+    @Nullable

Review Comment:
   Usually you put Nullable next to the type. Why is it different this time?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -121,30 +147,28 @@ public TxStateRocksDbStorage(
                 .putShort((short) partitionId)
                 .array();
 
-        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-        lastAppliedIndex = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes);
-        lastAppliedTerm = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes, Long.BYTES);
-
-        persistedIndex = lastAppliedIndex;
+        initLastApplied();
     }
 
     @Override
     @Nullable
     public TxMeta get(UUID txId) {
         if (!busyLock.enterBusy()) {
-            throwStorageStoppedException();
+            throwExceptionIfStorageClosedOrRebalance();

Review Comment:
   What's up with the rebalance in the exception if the storage is closed?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -397,12 +429,18 @@ private long readLastAppliedIndex(ReadOptions readOptions) {
 
     @Override
     public void destroy() {
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            close();
+        if (!close0()) {
+            return;
+        }
 
-            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+        try {
+            try (WriteBatch writeBatch = new WriteBatch()) {

Review Comment:
   Isn't a single try enough?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;

Review Comment:
   What is happening? If this future is always completed, why do you need a field for the future at all?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();
+        }
     }
 
     @Override
-    public CompletableFuture<Void> startFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> abortRebalance() {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+
+                        lastAppliedIndex = 0;
+                        lastAppliedTerm = 0;
+                        persistedIndex = 0;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to clear storage for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
     @Override
-    public CompletableFuture<Void> abortFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Rebalancing has not started");
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+                        db.write(writeOptions, writeBatch);
+
+                        this.lastAppliedIndex = lastAppliedIndex;
+                        this.lastAppliedTerm = lastAppliedTerm;
+                        this.persistedIndex = lastAppliedIndex;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to finish rebalance for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
-    @Override
-    public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    private void initLastApplied() {
+        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+        if (indexAndTermBytes != null) {
+            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                try (WriteBatch writeBatch = new WriteBatch()) {

Review Comment:
   I wonder if there's a designated "start" method in the storage.
   Doing a recovery in the constructor?... That's a bad idea. Reading data in constructor is also bad idea, but that was my fault I believe.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -199,21 +221,22 @@ public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMet
 
                         result = true;
                     } else {
-                        result = txMetaExisting.txState() == txMeta.txState() && (
-                                (txMetaExisting.commitTimestamp() == null && txMeta.commitTimestamp() == null)
-                                        || txMetaExisting.commitTimestamp().equals(txMeta.commitTimestamp()));
+                        result = txMetaExisting.txState() == txMeta.txState()
+                                && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
                     }
                 } else {
                     result = false;
                 }
             }
 
-            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
+            if (state != StorageState.REBALANCE) {

Review Comment:
   I think places like this deserve comments



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();
+        }
     }
 
     @Override
-    public CompletableFuture<Void> startFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> abortRebalance() {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+
+                        lastAppliedIndex = 0;
+                        lastAppliedTerm = 0;
+                        persistedIndex = 0;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to clear storage for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
     @Override
-    public CompletableFuture<Void> abortFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Rebalancing has not started");
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+                        db.write(writeOptions, writeBatch);
+
+                        this.lastAppliedIndex = lastAppliedIndex;
+                        this.lastAppliedTerm = lastAppliedTerm;
+                        this.persistedIndex = lastAppliedIndex;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to finish rebalance for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
-    @Override
-    public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    private void initLastApplied() {
+        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+        if (indexAndTermBytes != null) {
+            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                try (WriteBatch writeBatch = new WriteBatch()) {
+                    writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                    writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                    db.write(writeOptions, writeBatch);
+                } catch (Exception e) {
+                    throw new IgniteInternalException(
+                            TX_STATE_STORAGE_REBALANCE_ERR,
+                            IgniteStringFormatter.format(
+                                    "Failed to clear storage for partition {} of table {}",
+                                    partitionId,
+                                    getTableName()
+                            ),
+                            e
+                    );
+                }
+            } else {
+                this.lastAppliedIndex = lastAppliedIndex;
+                persistedIndex = lastAppliedIndex;
+
+                lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+            }
+        }
+    }
+
+    private boolean close0() {

Review Comment:
   Not the best name for the method that performs so many complex actions.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -57,6 +61,24 @@
  * Tx state storage implementation based on RocksDB.
  */
 public class TxStateRocksDbStorage implements TxStateStorage {
+    private static final VarHandle STATE;
+
+    private static final VarHandle REBALANCE_FUTURE;
+
+    static {
+        try {
+            STATE = MethodHandles.lookup().findVarHandle(TxStateRocksDbStorage.class, "state", StorageState.class);

Review Comment:
   What's the problem with using atomics?
   Are you sure that placing these values inside of the object is beneficial? Do you have proofs?
   I'd prefer a code simplicity if the advantage is not clear.



##########
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java:
##########
@@ -42,21 +42,30 @@ protected BusyRocksIteratorAdapter(IgniteSpinBusyLock busyLock, RocksIterator it
     /**
      * Handles busy lock acquiring failure. This means that db has been stopped and cursor can't proceed. Must throw an exception.
      */
-    protected abstract void handleBusy();
+    protected abstract void handleBusyFail();
 
-    private void handleBusy0() {
-        handleBusy();
+    /**
+     * Handles busy lock acquiring success.
+     */
+    protected void handeBusySuccess() {

Review Comment:
   Why did you introduce this method? No one uses it. Can you please explain it?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();

Review Comment:
   Why do you need a busy lock in this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org