You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/12/24 11:47:39 UTC
[shardingsphere] branch master updated: Fix CompletableFuture.anyOf missing callback in tasks runner (#23079)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 350dd571f86 Fix CompletableFuture.anyOf missing callback in tasks runner (#23079)
350dd571f86 is described below
commit 350dd571f86e395be79e73b1ab63de2f8c4d741b
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Dec 24 19:47:33 2022 +0800
Fix CompletableFuture.anyOf missing callback in tasks runner (#23079)
---
.../core/task/InventoryIncrementalTasksRunner.java | 82 +++++++++++-----------
1 file changed, 40 insertions(+), 42 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 875f9faac39..537b70989d7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProg
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Inventory incremental tasks' runner.
@@ -75,19 +76,15 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
return;
}
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
- if (executeInventoryTask()) {
- if (jobItemContext.isStopping()) {
- return;
- }
+ if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
+ log.info("All inventory tasks finished.");
executeIncrementalTask();
+ } else {
+ executeInventoryTask();
}
}
- private synchronized boolean executeInventoryTask() {
- if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
- log.info("All inventory tasks finished.");
- return true;
- }
+ private synchronized void executeInventoryTask() {
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (InventoryTask each : inventoryTasks) {
@@ -96,26 +93,26 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
}
futures.addAll(each.start());
}
- CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
- if (null != throwable) {
- log.error("onFailure, inventory task execute failed.", throwable);
- updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
- String jobId = jobItemContext.getJobId();
- jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
- jobAPI.stop(jobId);
- }
- });
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
- if (null == throwable) {
- if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
- log.info("onSuccess, all inventory tasks finished.");
- executeIncrementalTask();
- } else {
- log.info("onSuccess, inventory tasks not finished");
+ AtomicInteger completedCount = new AtomicInteger(0);
+ for (CompletableFuture<?> each : futures) {
+ each.whenComplete((o, throwable) -> {
+ completedCount.addAndGet(1);
+ if (null != throwable) {
+ log.error("onFailure, inventory task execute failed.", throwable);
+ updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
+ String jobId = jobItemContext.getJobId();
+ jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
+ jobAPI.stop(jobId);
+ } else if (completedCount.get() == futures.size()) {
+ if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
+ log.info("onSuccess, all inventory tasks finished.");
+ executeIncrementalTask();
+ } else {
+ log.info("onSuccess, inventory tasks not finished");
+ }
}
- }
- });
- return false;
+ });
+ }
}
private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
@@ -140,19 +137,20 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
}
futures.addAll(each.start());
}
- CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
- if (null != throwable) {
- log.error("onFailure, incremental task execute failed.", throwable);
- updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
- String jobId = jobItemContext.getJobId();
- jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
- jobAPI.stop(jobId);
- }
- });
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((unused, throwable) -> {
- if (null == throwable) {
- log.info("onSuccess, all incremental tasks finished.");
- }
- });
+ AtomicInteger completedCount = new AtomicInteger(0);
+ for (CompletableFuture<?> each : futures) {
+ each.whenComplete((o, throwable) -> {
+ completedCount.addAndGet(1);
+ if (null != throwable) {
+ log.error("onFailure, incremental task execute failed.", throwable);
+ updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
+ String jobId = jobItemContext.getJobId();
+ jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
+ jobAPI.stop(jobId);
+ } else if (completedCount.get() == futures.size()) {
+ log.info("onSuccess, all incremental tasks finished.");
+ }
+ });
+ }
}
}