You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/19 09:41:24 UTC

[dolphinscheduler] 04/04: fix workflow keep running when task fail (#11930)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 780a509f673359c0a5c408e4f20cfc35fea024d3
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Sep 15 09:06:10 2022 +0800

    fix workflow keep running when task fail (#11930)
---
 .../server/master/runner/WorkflowExecuteRunnable.java     | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9e90b8c8a0..9d6eef3259 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -116,6 +116,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Workflow execute task, used to execute a workflow instance.
@@ -180,9 +181,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
 
     /**
-     * depend failed task map, taskCode as key, taskId as value
+     * depend failed task set
      */
-    private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
+    private final Set<Long> dependFailedTaskSet = Sets.newConcurrentHashSet();
 
     /**
      * forbidden task map, code as key
@@ -804,7 +805,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
         taskFailedSubmit = false;
         activeTaskProcessorMaps.clear();
-        dependFailedTaskMap.clear();
+        dependFailedTaskSet.clear();
         completeTaskMap.clear();
         errorTaskMap.clear();
 
@@ -904,8 +905,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 }
             }
         }
-        logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
-                dependFailedTaskMap,
+        logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}",
+                dependFailedTaskSet,
                 completeTaskMap,
                 errorTaskMap);
     }
@@ -1484,7 +1485,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         if (this.errorTaskMap.size() > 0) {
             return true;
         }
-        return this.dependFailedTaskMap.size() > 0;
+        return this.dependFailedTaskSet.size() > 0;
     }
 
     /**
@@ -1835,7 +1836,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 }
             } else if (DependResult.FAILED == dependResult) {
                 // if the dependency fails, the current node is not submitted and the state changes to failure.
-                dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+                dependFailedTaskSet.add(task.getTaskCode());
                 removeTaskFromStandbyList(task);
                 logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(),
                         dependResult);