You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "tkalkirill (via GitHub)" <gi...@apache.org> on 2023/01/23 10:40:06 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #1562: IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier

tkalkirill opened a new pull request, #1562:
URL: https://github.com/apache/ignite-3/pull/1562

   https://issues.apache.org/jira/browse/IGNITE-18030


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1562: IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #1562:
URL: https://github.com/apache/ignite-3/pull/1562#discussion_r1084898581


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -157,9 +162,9 @@ public void join() throws InterruptedException {
     public void cancel() {
         canceled = true;
 
-        LOG.info("Copier is canceled for partition [partId={}, tableId={}]", partId(), tableId());
+        LOG.info("Copier is canceled for partition [{}]", createPartitionInfo());
 
-        CompletableFuture<?> fut = future;
+        CompletableFuture<?> fut = rebalanceFuture;

Review Comment:
   Discussed personally, will work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1562: IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #1562:
URL: https://github.com/apache/ignite-3/pull/1562#discussion_r1084898716


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -96,38 +102,37 @@ public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage,
     public void start() {
         Executor executor = partitionSnapshotStorage.getIncomingSnapshotsExecutor();
 
-        LOG.info("Copier is started for the partition [partId={}, tableId={}]", partId(), tableId());
+        LOG.info("Copier is started for the partition [{}]", createPartitionInfo());
+
+        joinFuture = new CompletableFuture<>();
 
-        future = prepareMvPartitionStorageForRebalance()
-                .thenCompose(unused -> prepareTxStatePartitionStorageForRebalance(executor))
+        rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
                 .thenCompose(unused -> {
                     ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
 
                     if (snapshotSender == null) {
-                        LOG.error(
-                                "Snapshot sender not found [partId={}, tableId={}, nodeName={}]",
-                                partId(),
-                                tableId(),
-                                snapshotUri.nodeName
-                        );
-
-                        if (!isOk()) {
-                            setError(RaftError.UNKNOWN, "Sender node was not found or it is offline");
-                        }
-
-                        return completedFuture(null);
+                        throw new StorageRebalanceException("Snapshot sender not found: " + snapshotUri.nodeName);
                     }
 
                     return loadSnapshotMeta(snapshotSender)
                             .thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor))
-                            .thenAcceptAsync(unused1 -> updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor));
+                });
+
+        rebalanceFuture.handle((unused, throwable) -> completesRebalance(throwable))

Review Comment:
   Type. fix it.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -96,38 +102,37 @@ public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage,
     public void start() {
         Executor executor = partitionSnapshotStorage.getIncomingSnapshotsExecutor();
 
-        LOG.info("Copier is started for the partition [partId={}, tableId={}]", partId(), tableId());
+        LOG.info("Copier is started for the partition [{}]", createPartitionInfo());
+
+        joinFuture = new CompletableFuture<>();
 
-        future = prepareMvPartitionStorageForRebalance()
-                .thenCompose(unused -> prepareTxStatePartitionStorageForRebalance(executor))
+        rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
                 .thenCompose(unused -> {
                     ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
 
                     if (snapshotSender == null) {
-                        LOG.error(
-                                "Snapshot sender not found [partId={}, tableId={}, nodeName={}]",
-                                partId(),
-                                tableId(),
-                                snapshotUri.nodeName
-                        );
-
-                        if (!isOk()) {
-                            setError(RaftError.UNKNOWN, "Sender node was not found or it is offline");
-                        }
-
-                        return completedFuture(null);
+                        throw new StorageRebalanceException("Snapshot sender not found: " + snapshotUri.nodeName);
                     }
 
                     return loadSnapshotMeta(snapshotSender)
                             .thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor))
-                            .thenAcceptAsync(unused1 -> updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor));
+                });
+
+        rebalanceFuture.handle((unused, throwable) -> completesRebalance(throwable))
+                .thenCompose(Function.identity())
+                .whenComplete((unused, throwable) -> {
+                    if (throwable == null) {
+                        joinFuture.complete(null);

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill merged pull request #1562: IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill merged PR #1562:
URL: https://github.com/apache/ignite-3/pull/1562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1562: IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1562:
URL: https://github.com/apache/ignite-3/pull/1562#discussion_r1085369033


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java:
##########
@@ -167,6 +167,9 @@ public long maxLastAppliedTerm() {
 
     @Override
     public CompletableFuture<Void> startRebalance() {
+        // TODO: IGNITE-18619 Fix this bullshit, we should have already waited for the indexes to be created

Review Comment:
   Language :) We don't commit swear words into main



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1562: IGNITE-18030 Integration of the new full rebalance API with IncomingSnapshotCopier

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1562:
URL: https://github.com/apache/ignite-3/pull/1562#discussion_r1084876619


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -96,38 +102,37 @@ public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage,
     public void start() {
         Executor executor = partitionSnapshotStorage.getIncomingSnapshotsExecutor();
 
-        LOG.info("Copier is started for the partition [partId={}, tableId={}]", partId(), tableId());
+        LOG.info("Copier is started for the partition [{}]", createPartitionInfo());
+
+        joinFuture = new CompletableFuture<>();
 
-        future = prepareMvPartitionStorageForRebalance()
-                .thenCompose(unused -> prepareTxStatePartitionStorageForRebalance(executor))
+        rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
                 .thenCompose(unused -> {
                     ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
 
                     if (snapshotSender == null) {
-                        LOG.error(
-                                "Snapshot sender not found [partId={}, tableId={}, nodeName={}]",
-                                partId(),
-                                tableId(),
-                                snapshotUri.nodeName
-                        );
-
-                        if (!isOk()) {
-                            setError(RaftError.UNKNOWN, "Sender node was not found or it is offline");
-                        }
-
-                        return completedFuture(null);
+                        throw new StorageRebalanceException("Snapshot sender not found: " + snapshotUri.nodeName);
                     }
 
                     return loadSnapshotMeta(snapshotSender)
                             .thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor))
-                            .thenAcceptAsync(unused1 -> updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor));
+                });
+
+        rebalanceFuture.handle((unused, throwable) -> completesRebalance(throwable))

Review Comment:
   Why "completes" instead of "complete"?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -96,38 +102,37 @@ public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage,
     public void start() {
         Executor executor = partitionSnapshotStorage.getIncomingSnapshotsExecutor();
 
-        LOG.info("Copier is started for the partition [partId={}, tableId={}]", partId(), tableId());
+        LOG.info("Copier is started for the partition [{}]", createPartitionInfo());
+
+        joinFuture = new CompletableFuture<>();
 
-        future = prepareMvPartitionStorageForRebalance()
-                .thenCompose(unused -> prepareTxStatePartitionStorageForRebalance(executor))
+        rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
                 .thenCompose(unused -> {
                     ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
 
                     if (snapshotSender == null) {
-                        LOG.error(
-                                "Snapshot sender not found [partId={}, tableId={}, nodeName={}]",
-                                partId(),
-                                tableId(),
-                                snapshotUri.nodeName
-                        );
-
-                        if (!isOk()) {
-                            setError(RaftError.UNKNOWN, "Sender node was not found or it is offline");
-                        }
-
-                        return completedFuture(null);
+                        throw new StorageRebalanceException("Snapshot sender not found: " + snapshotUri.nodeName);
                     }
 
                     return loadSnapshotMeta(snapshotSender)
                             .thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor))
-                            .thenAcceptAsync(unused1 -> updateLastAppliedIndexFromSnapshotMetaForStorages(), executor);
+                            .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor));
+                });
+
+        rebalanceFuture.handle((unused, throwable) -> completesRebalance(throwable))
+                .thenCompose(Function.identity())
+                .whenComplete((unused, throwable) -> {
+                    if (throwable == null) {
+                        joinFuture.complete(null);

Review Comment:
   Why do you need a second object? This is not explained and not obvious at all



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -157,9 +162,9 @@ public void join() throws InterruptedException {
     public void cancel() {
         canceled = true;
 
-        LOG.info("Copier is canceled for partition [partId={}, tableId={}]", partId(), tableId());
+        LOG.info("Copier is canceled for partition [{}]", createPartitionInfo());
 
-        CompletableFuture<?> fut = future;
+        CompletableFuture<?> fut = rebalanceFuture;

Review Comment:
   Does it work? Do we have a test that would show that "cancel" works when there are no messages received by the node?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org