You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/13 14:47:00 UTC

[dolphinscheduler] branch dev updated: [Bug-7319][MasterServer] fix taskNode NPE when switch else branch is empty (#7320)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 99b8ec6  [Bug-7319][MasterServer] fix taskNode NPE when switch else branch is empty (#7320)
99b8ec6 is described below

commit 99b8ec649213270df91b5a2808a26868c675a0d3
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Mon Dec 13 22:46:48 2021 +0800

    [Bug-7319][MasterServer] fix taskNode NPE when switch else branch is empty (#7320)
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../dolphinscheduler/dao/utils/DagHelper.java      |  9 ++++----
 .../master/runner/task/SwitchTaskProcessor.java    | 25 ++++++++++++++++++++--
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index d419505..e7404fe 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -50,7 +50,6 @@ public class DagHelper {
 
     private static final Logger logger = LoggerFactory.getLogger(DagHelper.class);
 
-
     /**
      * generate flow node relation list by task node list;
      * Edges that are not in the task Node List will not be added to the result
@@ -135,7 +134,6 @@ public class DagHelper {
         return destTaskNodeList;
     }
 
-
     /**
      * find all the nodes that depended on the start node
      *
@@ -160,7 +158,6 @@ public class DagHelper {
         return resultList;
     }
 
-
     /**
      * find all nodes that start nodes depend on.
      *
@@ -310,6 +307,10 @@ public class DagHelper {
         }
         for (String subsequent : startVertexes) {
             TaskNode taskNode = dag.getNode(subsequent);
+            if (taskNode == null) {
+                logger.error("taskNode {} is null, please check dag", subsequent);
+                continue;
+            }
             if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
                 setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList);
                 continue;
@@ -343,7 +344,6 @@ public class DagHelper {
         return true;
     }
 
-
     /**
      * parse condition task find the branch process
      * set skip flag for another one.
@@ -443,7 +443,6 @@ public class DagHelper {
         }
     }
 
-
     /***
      * build dag graph
      * @param processDag processDag
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 5378649..0fa6e5d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
 import java.util.Date;
@@ -75,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         taskDefinition = processService.findTaskDefinition(
                 taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
         );
-        taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(),
+        taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion(),
                 taskInstance.getProcessInstanceId(),
                 taskInstance.getId()));
@@ -176,7 +177,13 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         switchParameters.setResultConditionLocation(finalConditionLocation);
         taskInstance.setSwitchDependency(switchParameters);
 
-        logger.info("the switch task depend result : {}", conditionResult);
+        if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation))) {
+            conditionResult = DependResult.FAILED;
+            logger.error("the switch task depend result is invalid, result:{}, switch branch:{}", conditionResult, finalConditionLocation);
+            return true;
+        }
+
+        logger.info("the switch task depend result:{}, switch branch:{}", conditionResult, finalConditionLocation);
         return true;
     }
 
@@ -221,4 +228,18 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         return content;
     }
 
+    /**
+     * check whether switch result is valid
+     */
+    private boolean isValidSwitchResult(SwitchResultVo switchResult) {
+        if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
+            return false;
+        }
+        for (String nextNode : switchResult.getNextNode()) {
+            if (StringUtils.isEmpty(nextNode)) {
+                return false;
+            }
+        }
+        return true;
+    }
 }