You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/12/24 11:47:39 UTC

[shardingsphere] branch master updated: Fix CompletableFuture.anyOf missing callback in tasks runner (#23079)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 350dd571f86 Fix CompletableFuture.anyOf missing callback in tasks runner (#23079)
350dd571f86 is described below

commit 350dd571f86e395be79e73b1ab63de2f8c4d741b
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Dec 24 19:47:33 2022 +0800

    Fix CompletableFuture.anyOf missing callback in tasks runner (#23079)
---
 .../core/task/InventoryIncrementalTasksRunner.java | 82 +++++++++++-----------
 1 file changed, 40 insertions(+), 42 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 875f9faac39..537b70989d7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProg
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Inventory incremental tasks' runner.
@@ -75,19 +76,15 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             return;
         }
         PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
-        if (executeInventoryTask()) {
-            if (jobItemContext.isStopping()) {
-                return;
-            }
+        if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
+            log.info("All inventory tasks finished.");
             executeIncrementalTask();
+        } else {
+            executeInventoryTask();
         }
     }
     
-    private synchronized boolean executeInventoryTask() {
-        if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
-            log.info("All inventory tasks finished.");
-            return true;
-        }
+    private synchronized void executeInventoryTask() {
         updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (InventoryTask each : inventoryTasks) {
@@ -96,26 +93,26 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             }
             futures.addAll(each.start());
         }
-        CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
-            if (null != throwable) {
-                log.error("onFailure, inventory task execute failed.", throwable);
-                updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
-                String jobId = jobItemContext.getJobId();
-                jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
-                jobAPI.stop(jobId);
-            }
-        });
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
-            if (null == throwable) {
-                if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
-                    log.info("onSuccess, all inventory tasks finished.");
-                    executeIncrementalTask();
-                } else {
-                    log.info("onSuccess, inventory tasks not finished");
+        AtomicInteger completedCount = new AtomicInteger(0);
+        for (CompletableFuture<?> each : futures) {
+            each.whenComplete((o, throwable) -> {
+                completedCount.addAndGet(1);
+                if (null != throwable) {
+                    log.error("onFailure, inventory task execute failed.", throwable);
+                    updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
+                    String jobId = jobItemContext.getJobId();
+                    jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
+                    jobAPI.stop(jobId);
+                } else if (completedCount.get() == futures.size()) {
+                    if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
+                        log.info("onSuccess, all inventory tasks finished.");
+                        executeIncrementalTask();
+                    } else {
+                        log.info("onSuccess, inventory tasks not finished");
+                    }
                 }
-            }
-        });
-        return false;
+            });
+        }
     }
     
     private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
@@ -140,19 +137,20 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             }
             futures.addAll(each.start());
         }
-        CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
-            if (null != throwable) {
-                log.error("onFailure, incremental task execute failed.", throwable);
-                updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
-                String jobId = jobItemContext.getJobId();
-                jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
-                jobAPI.stop(jobId);
-            }
-        });
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
-            if (null == throwable) {
-                log.info("onSuccess, all incremental tasks finished.");
-            }
-        });
+        AtomicInteger completedCount = new AtomicInteger(0);
+        for (CompletableFuture<?> each : futures) {
+            each.whenComplete((o, throwable) -> {
+                completedCount.addAndGet(1);
+                if (null != throwable) {
+                    log.error("onFailure, incremental task execute failed.", throwable);
+                    updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
+                    String jobId = jobItemContext.getJobId();
+                    jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
+                    jobAPI.stop(jobId);
+                } else if (completedCount.get() == futures.size()) {
+                    log.info("onSuccess, all incremental tasks finished.");
+                }
+            });
+        }
     }
 }