You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/26 20:32:22 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

tkalkirill opened a new pull request, #1478:
URL: https://github.com/apache/ignite-3/pull/1478

   https://issues.apache.org/jira/browse/IGNITE-18024


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058133827


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -121,30 +147,28 @@ public TxStateRocksDbStorage(
                 .putShort((short) partitionId)
                 .array();
 
-        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-        lastAppliedIndex = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes);
-        lastAppliedTerm = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes, Long.BYTES);
-
-        persistedIndex = lastAppliedIndex;
+        initLastApplied();
     }
 
     @Override
     @Nullable
     public TxMeta get(UUID txId) {
         if (!busyLock.enterBusy()) {
-            throwStorageStoppedException();
+            throwExceptionIfStorageClosedOrRebalance();

Review Comment:
   I did not understand the question.
   If we weren't able to take the "busy" lock, then it's either because the storage is closed or it started rebalancing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058131224


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -93,8 +115,12 @@ public class TxStateRocksDbStorage implements TxStateStorage {
     /** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
     private volatile long persistedIndex;
 
-    /** Database key for the last applied index+term. */
-    private final byte[] lastAppliedIndexAndTermKey;
+    /** Current state of the storage. */
+    private volatile StorageState state = StorageState.RUNNABLE;
+
+    @Nullable

Review Comment:
   This is problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058387709


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -121,30 +147,28 @@ public TxStateRocksDbStorage(
                 .putShort((short) partitionId)
                 .array();
 
-        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-        lastAppliedIndex = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes);
-        lastAppliedTerm = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes, Long.BYTES);
-
-        persistedIndex = lastAppliedIndex;
+        initLastApplied();
     }
 
     @Override
     @Nullable
     public TxMeta get(UUID txId) {
         if (!busyLock.enterBusy()) {
-            throwStorageStoppedException();
+            throwExceptionIfStorageClosedOrRebalance();

Review Comment:
   The "busy" lock only protects against the start of the rebalance, but not after.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058175467


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();
+        }
     }
 
     @Override
-    public CompletableFuture<Void> startFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> abortRebalance() {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+
+                        lastAppliedIndex = 0;
+                        lastAppliedTerm = 0;
+                        persistedIndex = 0;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to clear storage for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
     @Override
-    public CompletableFuture<Void> abortFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Rebalancing has not started");
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+                        db.write(writeOptions, writeBatch);
+
+                        this.lastAppliedIndex = lastAppliedIndex;
+                        this.lastAppliedTerm = lastAppliedTerm;
+                        this.persistedIndex = lastAppliedIndex;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to finish rebalance for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
-    @Override
-    public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    private void initLastApplied() {
+        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+        if (indexAndTermBytes != null) {
+            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                try (WriteBatch writeBatch = new WriteBatch()) {
+                    writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                    writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                    db.write(writeOptions, writeBatch);
+                } catch (Exception e) {
+                    throw new IgniteInternalException(
+                            TX_STATE_STORAGE_REBALANCE_ERR,
+                            IgniteStringFormatter.format(
+                                    "Failed to clear storage for partition {} of table {}",
+                                    partitionId,
+                                    getTableName()
+                            ),
+                            e
+                    );
+                }
+            } else {
+                this.lastAppliedIndex = lastAppliedIndex;
+                persistedIndex = lastAppliedIndex;
+
+                lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+            }
+        }
+    }
+
+    private boolean close0() {

Review Comment:
   Tried to rename and add documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058380201


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -121,30 +147,28 @@ public TxStateRocksDbStorage(
                 .putShort((short) partitionId)
                 .array();
 
-        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-        lastAppliedIndex = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes);
-        lastAppliedTerm = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes, Long.BYTES);
-
-        persistedIndex = lastAppliedIndex;
+        initLastApplied();
     }
 
     @Override
     @Nullable
     public TxMeta get(UUID txId) {
         if (!busyLock.enterBusy()) {
-            throwStorageStoppedException();
+            throwExceptionIfStorageClosedOrRebalance();

Review Comment:
   Why do we perform "busyLockSuccess" check if rebalance is covered by the busy lock itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1057730028


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -93,8 +115,12 @@ public class TxStateRocksDbStorage implements TxStateStorage {
     /** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
     private volatile long persistedIndex;
 
-    /** Database key for the last applied index+term. */
-    private final byte[] lastAppliedIndexAndTermKey;
+    /** Current state of the storage. */
+    private volatile StorageState state = StorageState.RUNNABLE;
+
+    @Nullable

Review Comment:
   Usually you put Nullable next to the type. Why is it different this time?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -121,30 +147,28 @@ public TxStateRocksDbStorage(
                 .putShort((short) partitionId)
                 .array();
 
-        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-        lastAppliedIndex = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes);
-        lastAppliedTerm = indexAndTermBytes == null ? 0 : bytesToLong(indexAndTermBytes, Long.BYTES);
-
-        persistedIndex = lastAppliedIndex;
+        initLastApplied();
     }
 
     @Override
     @Nullable
     public TxMeta get(UUID txId) {
         if (!busyLock.enterBusy()) {
-            throwStorageStoppedException();
+            throwExceptionIfStorageClosedOrRebalance();

Review Comment:
   What's up with the rebalance in the exception if the storage is closed?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -397,12 +429,18 @@ private long readLastAppliedIndex(ReadOptions readOptions) {
 
     @Override
     public void destroy() {
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            close();
+        if (!close0()) {
+            return;
+        }
 
-            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+        try {
+            try (WriteBatch writeBatch = new WriteBatch()) {

Review Comment:
   Isn't a single try enough?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;

Review Comment:
   What is happening? If this future is always completed, why do you need a field for the future at all?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();
+        }
     }
 
     @Override
-    public CompletableFuture<Void> startFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> abortRebalance() {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+
+                        lastAppliedIndex = 0;
+                        lastAppliedTerm = 0;
+                        persistedIndex = 0;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to clear storage for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
     @Override
-    public CompletableFuture<Void> abortFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Rebalancing has not started");
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+                        db.write(writeOptions, writeBatch);
+
+                        this.lastAppliedIndex = lastAppliedIndex;
+                        this.lastAppliedTerm = lastAppliedTerm;
+                        this.persistedIndex = lastAppliedIndex;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to finish rebalance for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
-    @Override
-    public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    private void initLastApplied() {
+        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+        if (indexAndTermBytes != null) {
+            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                try (WriteBatch writeBatch = new WriteBatch()) {

Review Comment:
   I wonder if there's a designated "start" method in the storage.
   Doing a recovery in the constructor?... That's a bad idea. Reading data in constructor is also bad idea, but that was my fault I believe.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -199,21 +221,22 @@ public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMet
 
                         result = true;
                     } else {
-                        result = txMetaExisting.txState() == txMeta.txState() && (
-                                (txMetaExisting.commitTimestamp() == null && txMeta.commitTimestamp() == null)
-                                        || txMetaExisting.commitTimestamp().equals(txMeta.commitTimestamp()));
+                        result = txMetaExisting.txState() == txMeta.txState()
+                                && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
                     }
                 } else {
                     result = false;
                 }
             }
 
-            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
+            if (state != StorageState.REBALANCE) {

Review Comment:
   I think places like this deserve comments



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();
+        }
     }
 
     @Override
-    public CompletableFuture<Void> startFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> abortRebalance() {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+
+                        lastAppliedIndex = 0;
+                        lastAppliedTerm = 0;
+                        persistedIndex = 0;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to clear storage for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
     @Override
-    public CompletableFuture<Void> abortFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Rebalancing has not started");
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+                        db.write(writeOptions, writeBatch);
+
+                        this.lastAppliedIndex = lastAppliedIndex;
+                        this.lastAppliedTerm = lastAppliedTerm;
+                        this.persistedIndex = lastAppliedIndex;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to finish rebalance for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
-    @Override
-    public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    private void initLastApplied() {
+        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+        if (indexAndTermBytes != null) {
+            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                try (WriteBatch writeBatch = new WriteBatch()) {
+                    writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                    writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                    db.write(writeOptions, writeBatch);
+                } catch (Exception e) {
+                    throw new IgniteInternalException(
+                            TX_STATE_STORAGE_REBALANCE_ERR,
+                            IgniteStringFormatter.format(
+                                    "Failed to clear storage for partition {} of table {}",
+                                    partitionId,
+                                    getTableName()
+                            ),
+                            e
+                    );
+                }
+            } else {
+                this.lastAppliedIndex = lastAppliedIndex;
+                persistedIndex = lastAppliedIndex;
+
+                lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+            }
+        }
+    }
+
+    private boolean close0() {

Review Comment:
   Not the best name for the method that performs so many complex actions.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -57,6 +61,24 @@
  * Tx state storage implementation based on RocksDB.
  */
 public class TxStateRocksDbStorage implements TxStateStorage {
+    private static final VarHandle STATE;
+
+    private static final VarHandle REBALANCE_FUTURE;
+
+    static {
+        try {
+            STATE = MethodHandles.lookup().findVarHandle(TxStateRocksDbStorage.class, "state", StorageState.class);

Review Comment:
   What's the problem with using atomics?
   Are you sure that placing these values inside of the object is beneficial? Do you have proofs?
   I'd prefer a code simplicity if the advantage is not clear.



##########
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java:
##########
@@ -42,21 +42,30 @@ protected BusyRocksIteratorAdapter(IgniteSpinBusyLock busyLock, RocksIterator it
     /**
      * Handles busy lock acquiring failure. This means that db has been stopped and cursor can't proceed. Must throw an exception.
      */
-    protected abstract void handleBusy();
+    protected abstract void handleBusyFail();
 
-    private void handleBusy0() {
-        handleBusy();
+    /**
+     * Handles busy lock acquiring success.
+     */
+    protected void handeBusySuccess() {

Review Comment:
   Why did you introduce this method? No one uses it. Can you please explain it?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();

Review Comment:
   Why do you need a busy lock in this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058168297


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;

Review Comment:
   In general, I can get rid of `TxStateRocksDbStorage#rebalanceFuture`, and I will.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill merged pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill merged PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058379134


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -57,6 +61,24 @@
  * Tx state storage implementation based on RocksDB.
  */
 public class TxStateRocksDbStorage implements TxStateStorage {
+    private static final VarHandle STATE;
+
+    private static final VarHandle REBALANCE_FUTURE;
+
+    static {
+        try {
+            STATE = MethodHandles.lookup().findVarHandle(TxStateRocksDbStorage.class, "state", StorageState.class);

Review Comment:
   Ok, I give up. Generally speaking, less code is better than more code. Benefits of the memory saved is not clear at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058744868


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -479,40 +515,52 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        close0();
+        tryToCloseStorageAndResources();
     }
 
     @Override
     public CompletableFuture<Void> startRebalance() {
-        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
-            throwExceptionIfStorageClosedOrRebalance();
+        CompletableFuture<Void> rebalanceFuture = new CompletableFuture<>();
+
+        if (!REBALANCE_FUTURE.compareAndSet(this, null, rebalanceFuture)) {
+            throw createStorageInProgressOfRebalanceException();
         }
 
-        busyLock.block();
+        try {
+            if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+                throwExceptionIfStorageClosedOrRebalance();
+            }
 
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
-            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+            busyLock.block();
 
-            db.write(writeOptions, writeBatch);
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-            lastAppliedIndex = REBALANCE_IN_PROGRESS;
-            lastAppliedTerm = REBALANCE_IN_PROGRESS;
-            persistedIndex = REBALANCE_IN_PROGRESS;
+                db.write(writeOptions, writeBatch);
 
-            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+                lastAppliedIndex = REBALANCE_IN_PROGRESS;
+                lastAppliedTerm = REBALANCE_IN_PROGRESS;
+                persistedIndex = REBALANCE_IN_PROGRESS;
 
-            this.rebalanceFuture = rebalanceFuture;
+                rebalanceFuture.complete(null);
 
-            return rebalanceFuture;
-        } catch (Exception e) {
-            throw new IgniteInternalException(
-                    TX_STATE_STORAGE_REBALANCE_ERR,
-                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
-                    e
-            );
-        } finally {
-            busyLock.unblock();
+                return rebalanceFuture;
+            } catch (Exception e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_REBALANCE_ERR,
+                        IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                        e
+                );
+            } finally {
+                busyLock.unblock();
+            }
+        } catch (IgniteInternalException e) {

Review Comment:
   This is the handling of exceptions from lines 531 and 550. Yes, it is necessary to deal with standing, I will correct it a little.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058747218


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java:
##########
@@ -153,6 +153,8 @@ public TxStateStorage getOrCreateTxStateStorage(int partitionId) {
                 partitionId,
                 this
             );
+
+            storage.start();

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058116796


##########
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java:
##########
@@ -42,21 +42,30 @@ protected BusyRocksIteratorAdapter(IgniteSpinBusyLock busyLock, RocksIterator it
     /**
      * Handles busy lock acquiring failure. This means that db has been stopped and cursor can't proceed. Must throw an exception.
      */
-    protected abstract void handleBusy();
+    protected abstract void handleBusyFail();
 
-    private void handleBusy0() {
-        handleBusy();
+    /**
+     * Handles busy lock acquiring success.
+     */
+    protected void handeBusySuccess() {

Review Comment:
   Ok, I found a usage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058198431


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;

Review Comment:
   I'll try to change it up a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058142428


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();

Review Comment:
   Because I want to make sure that when we start a storage rebalance, all operations on it will be stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058384986


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbTableStorage.java:
##########
@@ -153,6 +153,8 @@ public TxStateStorage getOrCreateTxStateStorage(int partitionId) {
                 partitionId,
                 this
             );
+
+            storage.start();

Review Comment:
   Technically, we should close the instance if there's an exception in "start()", right? We may fix that in the future. This is the reason why I don't like exceptions in constructors - they may easily produce leaks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058148957


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -199,21 +221,22 @@ public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMet
 
                         result = true;
                     } else {
-                        result = txMetaExisting.txState() == txMeta.txState() && (
-                                (txMetaExisting.commitTimestamp() == null && txMeta.commitTimestamp() == null)
-                                        || txMetaExisting.commitTimestamp().equals(txMeta.commitTimestamp()));
+                        result = txMetaExisting.txState() == txMeta.txState()
+                                && Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
                     }
                 } else {
                     result = false;
                 }
             }
 
-            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
+            if (state != StorageState.REBALANCE) {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058383886


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -479,40 +515,52 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        close0();
+        tryToCloseStorageAndResources();
     }
 
     @Override
     public CompletableFuture<Void> startRebalance() {
-        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
-            throwExceptionIfStorageClosedOrRebalance();
+        CompletableFuture<Void> rebalanceFuture = new CompletableFuture<>();
+
+        if (!REBALANCE_FUTURE.compareAndSet(this, null, rebalanceFuture)) {
+            throw createStorageInProgressOfRebalanceException();
         }
 
-        busyLock.block();
+        try {
+            if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+                throwExceptionIfStorageClosedOrRebalance();
+            }
 
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
-            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+            busyLock.block();
 
-            db.write(writeOptions, writeBatch);
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-            lastAppliedIndex = REBALANCE_IN_PROGRESS;
-            lastAppliedTerm = REBALANCE_IN_PROGRESS;
-            persistedIndex = REBALANCE_IN_PROGRESS;
+                db.write(writeOptions, writeBatch);
 
-            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+                lastAppliedIndex = REBALANCE_IN_PROGRESS;
+                lastAppliedTerm = REBALANCE_IN_PROGRESS;
+                persistedIndex = REBALANCE_IN_PROGRESS;
 
-            this.rebalanceFuture = rebalanceFuture;
+                rebalanceFuture.complete(null);
 
-            return rebalanceFuture;
-        } catch (Exception e) {
-            throw new IgniteInternalException(
-                    TX_STATE_STORAGE_REBALANCE_ERR,
-                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
-                    e
-            );
-        } finally {
-            busyLock.unblock();
+                return rebalanceFuture;
+            } catch (Exception e) {
+                throw new IgniteInternalException(
+                        TX_STATE_STORAGE_REBALANCE_ERR,
+                        IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                        e
+                );
+            } finally {
+                busyLock.unblock();
+            }
+        } catch (IgniteInternalException e) {

Review Comment:
   Is this the exception from the line 550? Are there other sources for such type of exceptions? What do we do with the state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058141738


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -397,12 +429,18 @@ private long readLastAppliedIndex(ReadOptions readOptions) {
 
     @Override
     public void destroy() {
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            close();
+        if (!close0()) {
+            return;
+        }
 
-            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+        try {
+            try (WriteBatch writeBatch = new WriteBatch()) {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058126669


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -57,6 +61,24 @@
  * Tx state storage implementation based on RocksDB.
  */
 public class TxStateRocksDbStorage implements TxStateStorage {
+    private static final VarHandle STATE;
+
+    private static final VarHandle REBALANCE_FUTURE;
+
+    static {
+        try {
+            STATE = MethodHandles.lookup().findVarHandle(TxStateRocksDbStorage.class, "state", StorageState.class);

Review Comment:
   Since there can be many partitions, we can, although a little, win in memory consumption for each using `VarHandle` instead of atomics. The less memory is used, the faster the GC runs, the faster the system runs.
   
   Yes, it looks like premature optimization, but I don't use complex constructs to make the code much more complicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058171337


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();
+        }
     }
 
     @Override
-    public CompletableFuture<Void> startFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> abortRebalance() {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+
+                        lastAppliedIndex = 0;
+                        lastAppliedTerm = 0;
+                        persistedIndex = 0;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to clear storage for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
     @Override
-    public CompletableFuture<Void> abortFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "Rebalancing has not started");
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+                        db.write(writeOptions, writeBatch);
+
+                        this.lastAppliedIndex = lastAppliedIndex;
+                        this.lastAppliedTerm = lastAppliedTerm;
+                        this.persistedIndex = lastAppliedIndex;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to finish rebalance for partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
-    @Override
-    public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    private void initLastApplied() {
+        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+        if (indexAndTermBytes != null) {
+            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                try (WriteBatch writeBatch = new WriteBatch()) {

Review Comment:
   I'll move it to the "start" method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058121380


##########
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java:
##########
@@ -42,21 +42,30 @@ protected BusyRocksIteratorAdapter(IgniteSpinBusyLock busyLock, RocksIterator it
     /**
      * Handles busy lock acquiring failure. This means that db has been stopped and cursor can't proceed. Must throw an exception.
      */
-    protected abstract void handleBusy();
+    protected abstract void handleBusyFail();
 
-    private void handleBusy0() {
-        handleBusy();
+    /**
+     * Handles busy lock acquiring success.
+     */
+    protected void handeBusySuccess() {

Review Comment:
   This method is used here: `TxStateRocksDbStorage#scan`.
   In method `scan` above, it is needed to check that we are not in a storage rebalance state after getting a "busy" lock.
   
   Alternatively, we can get rid of method `handeBusySuccess` and use own cursor in `TxStateRocksDbStorage#scan' so as not to attach a crutch. I think it's better to do so.
   
   wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1478: IGNITE-18024 Implementation of a full rebalance for TxStateRocksDbStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058379315


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -93,8 +115,12 @@ public class TxStateRocksDbStorage implements TxStateStorage {
     /** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
     private volatile long persistedIndex;
 
-    /** Database key for the last applied index+term. */
-    private final byte[] lastAppliedIndexAndTermKey;
+    /** Current state of the storage. */
+    private volatile StorageState state = StorageState.RUNNABLE;
+
+    @Nullable

Review Comment:
   Just wondering, what's up with the inconsistency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org