You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/10/19 01:36:53 UTC

[dolphinscheduler] branch dev updated: make sure all failed task will save in errorTaskMap (#12424)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 38b643f69b make sure all failed task will save in errorTaskMap (#12424)
38b643f69b is described below

commit 38b643f69b65f4de9dd43809404470934bfadc7b
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Wed Oct 19 09:36:47 2022 +0800

    make sure all failed task will save in errorTaskMap (#12424)
---
 .../master/runner/WorkflowExecuteRunnable.java     | 23 ++++++++--------------
 1 file changed, 8 insertions(+), 15 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 962ed187d6..403135f3bd 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -391,13 +391,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 retryTaskInstance(taskInstance);
             } else if (taskInstance.getState().isFailure()) {
                 completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
+                errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                 // There are child nodes and the failure policy is: CONTINUE
                 if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
                         Long.toString(taskInstance.getTaskCode()),
                         dag)) {
                     submitPostNode(Long.toString(taskInstance.getTaskCode()));
                 } else {
-                    errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                     if (processInstance.getFailureStrategy() == FailureStrategy.END) {
                         killAllTasks();
                     }
@@ -421,7 +421,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * release task group
      *
-     * @param taskInstance
      */
     public void releaseTaskGroup(TaskInstance taskInstance) {
         logger.info("Release task group");
@@ -448,7 +447,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * crate new task instance to retry, different objects from the original
      *
-     * @param taskInstance
      */
     private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
         if (!taskInstance.taskCanRetry()) {
@@ -662,10 +660,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     }
 
     private boolean needComplementProcess() {
-        if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) {
-            return true;
-        }
-        return false;
+        return processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess();
     }
 
     /**
@@ -1069,7 +1064,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * clone a new taskInstance for retry and reset some logic fields
      *
-     * @return
+     * @return taskInstance
      */
     public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
         TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
@@ -1097,7 +1092,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * clone a new taskInstance for tolerant and reset some logic fields
      *
-     * @return
+     * @return taskInstance
      */
     public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
         TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
@@ -1119,9 +1114,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * new a taskInstance
      *
-     * @param processInstance
-     * @param taskNode
-     * @return
+     * @param processInstance process instance
+     * @param taskNode task node
+     * @return task instance
      */
     public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
         TaskInstance taskInstance = new TaskInstance();
@@ -1445,9 +1440,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             long taskCode = Long.parseLong(dependNodeName);
             Integer taskInstanceId = completeTaskMap.get(taskCode);
             TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
-            if (depTaskState.isFailure()) {
-                return false;
-            }
+            return !depTaskState.isFailure();
         }
         return true;
     }