You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2023/01/19 10:56:41 UTC

[GitHub] [ignite-3] sashapolo commented on a diff in pull request #1530: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver

sashapolo commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081075418


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -330,6 +333,14 @@ public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws Stor
         });
     }
 
+    private void lastAppliedBusy(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   Why is this method called like that? Can we rename it to `saveLastApplied`?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -967,15 +1005,18 @@ public void destroyData(WriteBatch writeBatch) throws RocksDBException {
         writeBatch.delete(meta, lastAppliedTermKey);
         writeBatch.delete(meta, lastGroupConfigKey);
 
-        writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
+        writeBatch.delete(meta, partitionIdKey(partitionId));
 
         writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
-        if (!stopGuard.compareAndSet(false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {

Review Comment:
   What if this method gets called during a rebalance?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();

Review Comment:
   Is it possible that a storage also gets stopped during a rebalance?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java:
##########
@@ -396,4 +415,58 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {

Review Comment:
   Same here about the method's name



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java:
##########
@@ -262,4 +290,58 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    private String createStorageInfo() {

Review Comment:
   Same here about this method's name



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+                CompletableFuture<Void> previousRebalanceFuture = rebalanceFutureByPartitionId.putIfAbsent(partitionId, rebalanceFuture);
+
+                assert previousRebalanceFuture == null : mvPartitionStorage.createStorageInfo();
+
+                return rebalanceFuture;
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                return completedFuture(null);
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {

Review Comment:
   `rebalanceFuture` is always completed, so this is kind of a synchronous call, because `thenAccept` will likely be executed immediately by the current thread



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {

Review Comment:
   I think `getStorageInfo` is a better name



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);

Review Comment:
   I don't understand this part. Why do we need to have a map with futures if it always stores completed futures?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop soon.
+        busyLock.block();
+
+        try {
+            clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Aborts storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+     */
+    void abortReblance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            clearStorageOnRebalance(writeBatch, 0, 0);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    /**
+     * Completes storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+     */
+    void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        try {
+            lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    private void clearStorageOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        writeBatch.delete(meta, lastGroupConfigKey);
+        writeBatch.delete(meta, partitionIdKey(partitionId));
+        writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
+    }
+
+    private void lastAppliedOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   Same here, `saveLastAppliedOnRebalance` would be a better name



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -635,19 +644,117 @@ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, Sor
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                mvPartitionStorage.startRebalance(writeBatch);
+
+                getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+                db.write(writeOptions, writeBatch);
+
+                CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+                CompletableFuture<Void> previousRebalanceFuture = rebalanceFutureByPartitionId.putIfAbsent(partitionId, rebalanceFuture);
+
+                assert previousRebalanceFuture == null : mvPartitionStorage.createStorageInfo();
+
+                return rebalanceFuture;
+            } catch (RocksDBException e) {
+                throw new StorageRebalanceException(
+                        "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                        e
+                );
+            }
+        });
     }
 
     @Override
     public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                return completedFuture(null);
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {
+                try (WriteBatch writeBatch = new WriteBatch()) {
+                    mvPartitionStorage.abortReblance(writeBatch);
+
+                    getHashIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+                    getSortedIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+
+                    db.write(writeOptions, writeBatch);
+                } catch (RocksDBException e) {
+                    throw new StorageRebalanceException(
+                            "Error when trying to abort rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+                            e
+                    );
+                }
+            });
+        });
     }
 
     @Override
     public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18027 Implement
-        throw new UnsupportedOperationException();
+        return inBusyLock(busyLock, () -> {
+            RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+            if (mvPartitionStorage == null) {
+                throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+            }
+
+            CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
+
+            if (rebalanceFuture == null) {
+                throw new StorageRebalanceException("Rebalance for partition did not start: " + mvPartitionStorage.createStorageInfo());
+            }
+
+            return rebalanceFuture.thenAccept(unused -> {

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org