You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2023/01/16 13:57:23 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

tkalkirill opened a new pull request, #1530:
URL: https://github.com/apache/ignite-3/pull/1530

   https://issues.apache.org/jira/browse/IGNITE-18027


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081185536


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();

Review Comment:
   No need, because at the beginning we are trying to change the state of the storage on line 1474.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
sashapolo commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081158465


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -330,6 +333,14 @@ public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws Stor
         });
     }
 
+    private void lastAppliedBusy(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   I think yes, current name does a poor job of describing what this method actually does 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081184519


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -330,6 +333,14 @@ public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws Stor
         });
     }
 
+    private void lastAppliedBusy(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081126504


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -967,15 +1005,18 @@ public void destroyData(WriteBatch writeBatch) throws RocksDBException {
         writeBatch.delete(meta, lastAppliedTermKey);
         writeBatch.delete(meta, lastGroupConfigKey);
 
-        writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
+        writeBatch.delete(meta, partitionIdKey(partitionId));
 
         writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
-        if (!stopGuard.compareAndSet(false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {

Review Comment:
   We can, but why? is there a need for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081126727


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {

Review Comment:
   Why?
   There are already similar methods in `AbstractPageMemoryMvPartitionStorage`, I think it's better to leave the similarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill merged pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill merged PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
sashapolo commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081075418


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -330,6 +333,14 @@ public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws Stor
         });
     }
 
+    private void lastAppliedBusy(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   Why is this method called like that? Can we rename it to `saveLastApplied`?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -967,15 +1005,18 @@ public void destroyData(WriteBatch writeBatch) throws RocksDBException {
         writeBatch.delete(meta, lastAppliedTermKey);
         writeBatch.delete(meta, lastGroupConfigKey);
 
-        writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
+        writeBatch.delete(meta, partitionIdKey(partitionId));
 
         writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
-        if (!stopGuard.compareAndSet(false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {

Review Comment:
   What if this method gets called during a rebalance?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();

Review Comment:
   Is it possible that a storage also gets stopped during a rebalance?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java:
##########
@@ -396,4 +415,58 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {

Review Comment:
   Same here about the method's name



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java:
##########
@@ -262,4 +290,58 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {

Review Comment:
   Same here about this method's name



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+                CompletableFuture<Void> previousRebalanceFuture = rebalanceFutureByPartitionId.putIfAbsent(partitionId, rebalanceFuture);
+
+                assert previousRebalanceFuture == null : mvPartitionStorage.createStorageInfo();
+
+                return rebalanceFuture;
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                return completedFuture(null);
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {

Review Comment:
   `rebalanceFuture` is always completed, so this is kind of a synchronous call, because `thenAccept` will likely be executed immediately by the current thread



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {

Review Comment:
   I think `getStorageInfo` is a better name



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);

Review Comment:
   I don't understand this part. Why do we need to have a map with futures if it always stores completed futures?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Aborts storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+     */
+    void abortReblance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            clearStorageOnRebalance(writeBatch, 0, 0);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    /**
+     * Completes storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+     */
+    void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    private void clearStorageOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        writeBatch.delete(meta, lastGroupConfigKey);
+        writeBatch.delete(meta, partitionIdKey(partitionId));
+        writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
+    }
+
+    private void lastAppliedOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   Same here, `saveLastAppliedOnRebalance` would be a better name



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+                CompletableFuture<Void> previousRebalanceFuture = rebalanceFutureByPartitionId.putIfAbsent(partitionId, rebalanceFuture);
+
+                assert previousRebalanceFuture == null : mvPartitionStorage.createStorageInfo();
+
+                return rebalanceFuture;
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                return completedFuture(null);
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {
+                try (WriteBatch writeBatch = new WriteBatch()) {
+                    mvPartitionStorage.abortReblance(writeBatch);
+
+                    getHashIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+                    getSortedIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+
+                    db.write(writeOptions, writeBatch);
+                } catch (RocksDBException e) {
+                    throw new StorageRebalanceException(
+                            "Error when trying to abort rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                            e
+                    );
+                }
+            });
+        });
     }
 
     @Override
     public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                throw new StorageRebalanceException("Rebalance for partition did not start: " + mvPartitionStorage.createStorageInfo());
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081126934


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -330,6 +333,14 @@ public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws Stor
         });
     }
 
+    private void lastAppliedBusy(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   We can, but why? is there a need for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081132844


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);

Review Comment:
   In general, I can replace `Map` with `Set`, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081129297


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java:
##########
@@ -262,4 +290,58 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {

Review Comment:
   Why?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java:
##########
@@ -396,4 +415,58 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {

Review Comment:
   Why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
sashapolo commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081160394


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);

Review Comment:
   Yes, that would make more sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081197136


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);

Review Comment:
   Fix it



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+                CompletableFuture<Void> previousRebalanceFuture = rebalanceFutureByPartitionId.putIfAbsent(partitionId, rebalanceFuture);
+
+                assert previousRebalanceFuture == null : mvPartitionStorage.createStorageInfo();
+
+                return rebalanceFuture;
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                return completedFuture(null);
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081128748


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();

Review Comment:
   Documentation for `org.apache.ignite.internal.storage.MvPartitionStorage#close` says:
   `<p>REQUIRED: For background tasks for partition, such as rebalancing, to be completed by the time the method is called.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
sashapolo commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081159865


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java:
##########
@@ -262,4 +290,58 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {

Review Comment:
   resolved



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java:
##########
@@ -396,4 +415,58 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {

Review Comment:
   resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081128431


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -967,15 +1005,18 @@ public void destroyData(WriteBatch writeBatch) throws RocksDBException {
         writeBatch.delete(meta, lastAppliedTermKey);
         writeBatch.delete(meta, lastGroupConfigKey);
 
-        writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
+        writeBatch.delete(meta, partitionIdKey(partitionId));
 
         writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
-        if (!stopGuard.compareAndSet(false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {

Review Comment:
   Documentation for `org.apache.ignite.internal.storage.MvPartitionStorage#close` says:
   `<p>REQUIRED: For background tasks for partition, such as rebalancing, to be completed by the time the method is called.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081126727


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {

Review Comment:
   Why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
sashapolo commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081159533


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();

Review Comment:
   Maybe we should have an assertion here? Otherwise we will simply stall



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081185912


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Aborts storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+     */
+    void abortReblance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            clearStorageOnRebalance(writeBatch, 0, 0);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    /**
+     * Completes storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+     */
+    void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    private void clearStorageOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        writeBatch.delete(meta, lastGroupConfigKey);
+        writeBatch.delete(meta, partitionIdKey(partitionId));
+        writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
+    }
+
+    private void lastAppliedOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081197351


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+                CompletableFuture<Void> previousRebalanceFuture = rebalanceFutureByPartitionId.putIfAbsent(partitionId, rebalanceFuture);
+
+                assert previousRebalanceFuture == null : mvPartitionStorage.createStorageInfo();
+
+                return rebalanceFuture;
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                return completedFuture(null);
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {
+                try (WriteBatch writeBatch = new WriteBatch()) {
+                    mvPartitionStorage.abortReblance(writeBatch);
+
+                    getHashIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+                    getSortedIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+
+                    db.write(writeOptions, writeBatch);
+                } catch (RocksDBException e) {
+                    throw new StorageRebalanceException(
+                            "Error when trying to abort rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                            e
+                    );
+                }
+            });
+        });
     }
 
     @Override
     public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                throw new StorageRebalanceException("Rebalance for partition did not start: " + mvPartitionStorage.createStorageInfo());
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
sashapolo commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081158798


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -967,15 +1005,18 @@ public void destroyData(WriteBatch writeBatch) throws RocksDBException {
         writeBatch.delete(meta, lastAppliedTermKey);
         writeBatch.delete(meta, lastGroupConfigKey);
 
-        writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
+        writeBatch.delete(meta, partitionIdKey(partitionId));
 
         writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
-        if (!stopGuard.compareAndSet(false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {

Review Comment:
   ok



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {

Review Comment:
   Agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081133385


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+                CompletableFuture<Void> previousRebalanceFuture = rebalanceFutureByPartitionId.putIfAbsent(partitionId, rebalanceFuture);
+
+                assert previousRebalanceFuture == null : mvPartitionStorage.createStorageInfo();
+
+                return rebalanceFuture;
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                return completedFuture(null);
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {

Review Comment:
   You're right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081132844


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);

Review Comment:
   In general, I can replace with `Set`, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081129036


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Aborts storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+     */
+    void abortReblance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            clearStorageOnRebalance(writeBatch, 0, 0);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    /**
+     * Completes storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+     */
+    void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    private void clearStorageOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        writeBatch.delete(meta, lastGroupConfigKey);
+        writeBatch.delete(meta, partitionIdKey(partitionId));
+        writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
+    }
+
+    private void lastAppliedOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   Why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org