You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/13 14:47:00 UTC
[dolphinscheduler] branch dev updated: [Bug-7319][MasterServer] fix taskNode NPE when switch else branch is empty (#7320)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 99b8ec6 [Bug-7319][MasterServer] fix taskNode NPE when switch else branch is empty (#7320)
99b8ec6 is described below
commit 99b8ec649213270df91b5a2808a26868c675a0d3
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Mon Dec 13 22:46:48 2021 +0800
[Bug-7319][MasterServer] fix taskNode NPE when switch else branch is empty (#7320)
Co-authored-by: caishunfeng <53...@qq.com>
---
.../dolphinscheduler/dao/utils/DagHelper.java | 9 ++++----
.../master/runner/task/SwitchTaskProcessor.java | 25 ++++++++++++++++++++--
2 files changed, 27 insertions(+), 7 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index d419505..e7404fe 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -50,7 +50,6 @@ public class DagHelper {
private static final Logger logger = LoggerFactory.getLogger(DagHelper.class);
-
/**
* generate flow node relation list by task node list;
* Edges that are not in the task Node List will not be added to the result
@@ -135,7 +134,6 @@ public class DagHelper {
return destTaskNodeList;
}
-
/**
* find all the nodes that depended on the start node
*
@@ -160,7 +158,6 @@ public class DagHelper {
return resultList;
}
-
/**
* find all nodes that start nodes depend on.
*
@@ -310,6 +307,10 @@ public class DagHelper {
}
for (String subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent);
+ if (taskNode == null) {
+ logger.error("taskNode {} is null, please check dag", subsequent);
+ continue;
+ }
if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList);
continue;
@@ -343,7 +344,6 @@ public class DagHelper {
return true;
}
-
/**
* parse condition task find the branch process
* set skip flag for another one.
@@ -443,7 +443,6 @@ public class DagHelper {
}
}
-
/***
* build dag graph
* @param processDag processDag
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 5378649..0fa6e5d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Date;
@@ -75,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
- taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(),
+ taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
@@ -176,7 +177,13 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
switchParameters.setResultConditionLocation(finalConditionLocation);
taskInstance.setSwitchDependency(switchParameters);
- logger.info("the switch task depend result : {}", conditionResult);
+ if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation))) {
+ conditionResult = DependResult.FAILED;
+ logger.error("the switch task depend result is invalid, result:{}, switch branch:{}", conditionResult, finalConditionLocation);
+ return true;
+ }
+
+ logger.info("the switch task depend result:{}, switch branch:{}", conditionResult, finalConditionLocation);
return true;
}
@@ -221,4 +228,18 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return content;
}
+ /**
+ * check whether switch result is valid
+ */
+ private boolean isValidSwitchResult(SwitchResultVo switchResult) {
+ if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
+ return false;
+ }
+ for (String nextNode : switchResult.getNextNode()) {
+ if (StringUtils.isEmpty(nextNode)) {
+ return false;
+ }
+ }
+ return true;
+ }
}