You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/04/25 07:29:24 UTC

[dolphinscheduler] branch dev updated: [Fix-9717] The failure policy of the task flow takes effect (#9718)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7bcec7115a [Fix-9717] The failure policy of the task flow takes effect (#9718)
7bcec7115a is described below

commit 7bcec7115aa5967bed9208ee24a567899fb4f84d
Author: WangJPLeo <10...@users.noreply.github.com>
AuthorDate: Mon Apr 25 15:29:18 2022 +0800

    [Fix-9717] The failure policy of the task flow takes effect (#9718)
    
    * Failure policy takes effect.
    
    * Coverage on New Code
    
    * correct description logic
    
    * Compatible with all scenarios
    
    * clearer logic
    
    Co-authored-by: WangJPLeo <wa...@whaleops.com>
---
 .../dolphinscheduler/dao/utils/DagHelper.java      | 39 +++++++++-------
 .../dolphinscheduler/dao/utils/DagHelperTest.java  | 52 ++++++++++++++++++++++
 .../master/runner/WorkflowExecuteThread.java       |  6 +--
 3 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index 0531791423..a3bd022192 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -534,19 +536,7 @@ public class DagHelper {
     public static boolean haveConditionsAfterNode(String parentNodeCode,
                                                   DAG<String, TaskNode, TaskNodeRelation> dag
     ) {
-        boolean result = false;
-        Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
-        if (CollectionUtils.isEmpty(subsequentNodes)) {
-            return result;
-        }
-        for (String nodeCode : subsequentNodes) {
-            TaskNode taskNode = dag.getNode(nodeCode);
-            List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
-            if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) {
-                return true;
-            }
-        }
-        return result;
+        return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_CONDITIONS);
     }
 
     /**
@@ -565,19 +555,38 @@ public class DagHelper {
         return false;
     }
 
+
     /**
      * is there have blocking node after the parent node
      */
     public static boolean haveBlockingAfterNode(String parentNodeCode,
                                                 DAG<String,TaskNode,TaskNodeRelation> dag) {
+        return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_BLOCKING);
+    }
+
+    /**
+     * is there have all node after the parent node
+     */
+    public static boolean haveAllNodeAfterNode(String parentNodeCode,
+                                               DAG<String,TaskNode,TaskNodeRelation> dag) {
+        return haveSubAfterNode(parentNodeCode, dag, null);
+    }
+
+    /**
+     * Whether there is a specified type of child node after the parent node
+     */
+    public static boolean haveSubAfterNode(String parentNodeCode,
+                                           DAG<String,TaskNode,TaskNodeRelation> dag, String filterNodeType) {
         Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
         if (CollectionUtils.isEmpty(subsequentNodes)) {
             return false;
         }
+        if (StringUtils.isBlank(filterNodeType)){
+            return true;
+        }
         for (String nodeName : subsequentNodes) {
             TaskNode taskNode = dag.getNode(nodeName);
-            List<String> preTaskList = JSONUtils.toList(taskNode.getPreTasks(),String.class);
-            if (preTaskList.contains(parentNodeCode) && taskNode.isBlockingTask()) {
+            if (taskNode.getType().equalsIgnoreCase(filterNodeType)){
                 return true;
             }
         }
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
index 1d374d9951..e0dafeb5bf 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@@ -48,6 +49,57 @@ import com.fasterxml.jackson.core.JsonProcessingException;
  * dag helper test
  */
 public class DagHelperTest {
+
+    @Test
+    public void testHaveSubAfterNode(){
+        String parentNodeCode = "5293789969856";
+        List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
+        TaskNodeRelation relation = new TaskNodeRelation();
+        relation.setStartNode("5293789969856");
+        relation.setEndNode("5293789969857");
+        taskNodeRelations.add(relation);
+
+        TaskNodeRelation relationNext = new TaskNodeRelation();
+        relationNext.setStartNode("5293789969856");
+        relationNext.setEndNode("5293789969858");
+        taskNodeRelations.add(relationNext);
+
+        List<TaskNode> taskNodes = new ArrayList<>();
+        TaskNode node = new TaskNode();
+        node.setCode(5293789969856L);
+        node.setType("SHELL");
+
+        TaskNode subNode = new TaskNode();
+        subNode.setCode(5293789969857L);
+        subNode.setType("BLOCKING");
+        subNode.setPreTasks("[5293789969856]");
+
+        TaskNode subNextNode = new TaskNode();
+        subNextNode.setCode(5293789969858L);
+        subNextNode.setType("CONDITIONS");
+        subNextNode.setPreTasks("[5293789969856]");
+
+        taskNodes.add(node);
+        taskNodes.add(subNode);
+        taskNodes.add(subNextNode);
+
+        ProcessDag processDag = new ProcessDag();
+        processDag.setEdges(taskNodeRelations);
+        processDag.setNodes(taskNodes);
+        DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
+        boolean canSubmit = DagHelper.haveAllNodeAfterNode(parentNodeCode, dag);
+        Assert.assertTrue(canSubmit);
+
+        boolean haveBlocking = DagHelper.haveBlockingAfterNode(parentNodeCode, dag);
+        Assert.assertTrue(haveBlocking);
+
+        boolean haveConditions = DagHelper.haveConditionsAfterNode(parentNodeCode, dag);
+        Assert.assertTrue(haveConditions);
+
+        boolean dependent = DagHelper.haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_DEPENDENT);
+        Assert.assertFalse(dependent);
+    }
+
     /**
      * test task node can submit
      *
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 979de3feb4..4d1cb296a7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -456,9 +456,9 @@ public class WorkflowExecuteThread {
             retryTaskInstance(taskInstance);
         } else if (taskInstance.getState().typeIsFailure()) {
             completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
-            if (taskInstance.isConditionsTask()
-                || DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
-                || DagHelper.haveBlockingAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
+            // There are child nodes and the failure policy is: CONTINUE
+            if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
+                    && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
                 submitPostNode(Long.toString(taskInstance.getTaskCode()));
             } else {
                 errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());