You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/19 09:41:24 UTC
[dolphinscheduler] 04/04: fix workflow keep running when task fail (#11930)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 780a509f673359c0a5c408e4f20cfc35fea024d3
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Sep 15 09:06:10 2022 +0800
fix workflow keep running when task fail (#11930)
---
.../server/master/runner/WorkflowExecuteRunnable.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9e90b8c8a0..9d6eef3259 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -116,6 +116,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* Workflow execute task, used to execute a workflow instance.
@@ -180,9 +181,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
/**
- * depend failed task map, taskCode as key, taskId as value
+ * depend failed task set
*/
- private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
+ private final Set<Long> dependFailedTaskSet = Sets.newConcurrentHashSet();
/**
* forbidden task map, code as key
@@ -804,7 +805,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
taskFailedSubmit = false;
activeTaskProcessorMaps.clear();
- dependFailedTaskMap.clear();
+ dependFailedTaskSet.clear();
completeTaskMap.clear();
errorTaskMap.clear();
@@ -904,8 +905,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
}
- logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
- dependFailedTaskMap,
+ logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}",
+ dependFailedTaskSet,
completeTaskMap,
errorTaskMap);
}
@@ -1484,7 +1485,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (this.errorTaskMap.size() > 0) {
return true;
}
- return this.dependFailedTaskMap.size() > 0;
+ return this.dependFailedTaskSet.size() > 0;
}
/**
@@ -1835,7 +1836,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
- dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+ dependFailedTaskSet.add(task.getTaskCode());
removeTaskFromStandbyList(task);
logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(),
dependResult);