You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/15 01:06:19 UTC

[dolphinscheduler] branch dev updated: fix workflow keep running when task fail (#11930)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6868876a29 fix workflow keep running when task fail (#11930)
6868876a29 is described below

commit 6868876a29c3099a3ea82ce1b65e82c9302fdd8d
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Sep 15 09:06:10 2022 +0800

    fix workflow keep running when task fail (#11930)
---
 .../server/master/runner/WorkflowExecuteRunnable.java     | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 8e4e64528e..67ac86808c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -116,6 +116,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Workflow execute task, used to execute a workflow instance.
@@ -180,9 +181,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
 
     /**
-     * depend failed task map, taskCode as key, taskId as value
+     * depend failed task set
      */
-    private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
+    private final Set<Long> dependFailedTaskSet = Sets.newConcurrentHashSet();
 
     /**
      * forbidden task map, code as key
@@ -805,7 +806,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
         taskFailedSubmit = false;
         activeTaskProcessorMaps.clear();
-        dependFailedTaskMap.clear();
+        dependFailedTaskSet.clear();
         completeTaskMap.clear();
         errorTaskMap.clear();
 
@@ -908,8 +909,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 }
             }
         }
-        logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
-                dependFailedTaskMap,
+        logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}",
+                dependFailedTaskSet,
                 completeTaskMap,
                 errorTaskMap);
     }
@@ -1494,7 +1495,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         if (this.errorTaskMap.size() > 0) {
             return true;
         }
-        return this.dependFailedTaskMap.size() > 0;
+        return this.dependFailedTaskSet.size() > 0;
     }
 
     /**
@@ -1845,7 +1846,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 }
             } else if (DependResult.FAILED == dependResult) {
                 // if the dependency fails, the current node is not submitted and the state changes to failure.
-                dependFailedTaskMap.put(task.getTaskCode(), task.getId());
+                dependFailedTaskSet.add(task.getTaskCode());
                 removeTaskFromStandbyList(task);
                 logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(),
                         dependResult);