You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/08/23 16:35:28 UTC
[dolphinscheduler] branch dev updated:
[Feature-#5273][server-master] Task node of SWITCH (#5922)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2e1768a [Feature-#5273][server-master] Task node of SWITCH (#5922)
2e1768a is described below
commit 2e1768aae6100bb05804c29c974bfaa274e2636f
Author: wangxj3 <85...@qq.com>
AuthorDate: Tue Aug 24 00:35:18 2021 +0800
[Feature-#5273][server-master] Task node of SWITCH (#5922)
Co-authored-by: wangxj <wangxj31>
---
.../dolphinscheduler/api/utils/CheckUtils.java | 45 ++----
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../dolphinscheduler/common/enums/TaskType.java | 4 +-
.../dolphinscheduler/common/model/TaskNode.java | 19 ++-
.../common/task/switchtask/SwitchParameters.java | 91 +++++++++++
.../common/task/switchtask/SwitchResultVo.java | 49 ++++++
.../common/utils/TaskParametersUtils.java | 3 +
.../dolphinscheduler/dao/entity/TaskInstance.java | 25 +++
.../dolphinscheduler/dao/utils/DagHelper.java | 50 +++++-
.../dolphinscheduler/dao/utils/DagHelperTest.java | 99 +++++++++++-
.../master/runner/MasterBaseTaskExecThread.java | 15 +-
.../server/master/runner/MasterExecThread.java | 2 +
.../server/master/runner/SwitchTaskExecThread.java | 180 +++++++++++++++++++++
.../server/utils/SwitchTaskUtils.java | 38 +++++
.../server/master/SwitchTaskTest.java | 167 +++++++++++++++++++
.../service/process/ProcessService.java | 1 +
.../service/process/ProcessServiceTest.java | 14 +-
pom.xml | 1 +
18 files changed, 765 insertions(+), 39 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
index aca9771..ad2f574 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
@@ -43,8 +43,7 @@ public class CheckUtils {
/**
* check username
*
- * @param userName
- * user name
+ * @param userName user name
* @return true if user name regex valid,otherwise return false
*/
public static boolean checkUserName(String userName) {
@@ -54,8 +53,7 @@ public class CheckUtils {
/**
* check email
*
- * @param email
- * email
+ * @param email email
* @return true if email regex valid, otherwise return false
*/
public static boolean checkEmail(String email) {
@@ -69,8 +67,7 @@ public class CheckUtils {
/**
* check project description
*
- * @param desc
- * desc
+ * @param desc desc
* @return true if description regex valid, otherwise return false
*/
public static Map<String, Object> checkDesc(String desc) {
@@ -78,7 +75,7 @@ public class CheckUtils {
if (StringUtils.isNotEmpty(desc) && desc.length() > 200) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG,
- MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
+ MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
} else {
result.put(Constants.STATUS, Status.SUCCESS);
}
@@ -88,8 +85,7 @@ public class CheckUtils {
/**
* check extra info
*
- * @param otherParams
- * other parames
+ * @param otherParams other parames
* @return true if other parameters are valid, otherwise return false
*/
public static boolean checkOtherParams(String otherParams) {
@@ -99,8 +95,7 @@ public class CheckUtils {
/**
* check password
*
- * @param password
- * password
+ * @param password password
* @return true if password regex valid, otherwise return false
*/
public static boolean checkPassword(String password) {
@@ -110,8 +105,7 @@ public class CheckUtils {
/**
* check phone phone can be empty.
*
- * @param phone
- * phone
+ * @param phone phone
* @return true if phone regex valid, otherwise return false
*/
public static boolean checkPhone(String phone) {
@@ -121,8 +115,7 @@ public class CheckUtils {
/**
* check task node parameter
*
- * @param taskNode
- * TaskNode
+ * @param taskNode TaskNode
* @return true if task node parameters are valid, otherwise return false
*/
public static boolean checkTaskNodeParameters(TaskNode taskNode) {
@@ -133,6 +126,8 @@ public class CheckUtils {
}
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getDependence());
+ } else if (TaskType.SWITCH.getDesc().equalsIgnoreCase(taskType)) {
+ abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getSwitchResult());
} else {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getParams());
}
@@ -147,28 +142,22 @@ public class CheckUtils {
/**
* check params
*
- * @param userName
- * user name
- * @param password
- * password
- * @param email
- * email
- * @param phone
- * phone
+ * @param userName user name
+ * @param password password
+ * @param email email
+ * @param phone phone
* @return true if user parameters are valid, other return false
*/
public static boolean checkUserParams(String userName, String password, String email, String phone) {
return CheckUtils.checkUserName(userName) && CheckUtils.checkEmail(email) && CheckUtils.checkPassword(password)
- && CheckUtils.checkPhone(phone);
+ && CheckUtils.checkPhone(phone);
}
/**
* regex check
*
- * @param str
- * input string
- * @param pattern
- * regex pattern
+ * @param str input string
+ * @param pattern regex pattern
* @return true if regex pattern is right, otherwise return false
*/
private static boolean regexChecks(String str, Pattern pattern) {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 53645b7..e2b8a0c 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -776,6 +776,7 @@ public final class Constants {
public static final String PROCESS_INSTANCE_STATE = "processInstanceState";
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
public static final String CONDITION_RESULT = "conditionResult";
+ public static final String SWITCH_RESULT = "switchResult";
public static final String DEPENDENCE = "dependence";
public static final String TASK_TYPE = "taskType";
public static final String TASK_LIST = "taskList";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
index d0842e4..3792368 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
@@ -51,7 +51,9 @@ public enum TaskType {
DATAX(10, "DATAX"),
CONDITIONS(11, "CONDITIONS"),
SQOOP(12, "SQOOP"),
- WATERDROP(13, "WATERDROP");
+ WATERDROP(13, "WATERDROP"),
+ SWITCH(14, "SWITCH"),
+ ;
TaskType(int code, String desc) {
this.code = code;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index b9c5a28..2e9262d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.Constants;
@@ -33,7 +34,6 @@ import java.util.Objects;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
public class TaskNode {
/**
@@ -129,6 +129,10 @@ public class TaskNode {
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String conditionResult;
+ @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+ @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+ private String switchResult;
+
/**
* task instance priority
*/
@@ -365,6 +369,10 @@ public class TaskNode {
return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.getType());
}
+ public boolean isSwitchTask() {
+ return TaskType.SWITCH.toString().equalsIgnoreCase(this.getType());
+ }
+
public List<PreviousTaskNode> getPreTaskNodeList() {
return preTaskNodeList;
}
@@ -380,6 +388,7 @@ public class TaskNode {
}
taskParams.put(Constants.CONDITION_RESULT, this.conditionResult);
taskParams.put(Constants.DEPENDENCE, this.dependence);
+ taskParams.put(Constants.SWITCH_RESULT, this.switchResult);
return JSONUtils.toJsonString(taskParams);
}
@@ -417,4 +426,12 @@ public class TaskNode {
+ ", delayTime=" + delayTime
+ '}';
}
+
+ public String getSwitchResult() {
+ return switchResult;
+ }
+
+ public void setSwitchResult(String switchResult) {
+ this.switchResult = switchResult;
+ }
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java
new file mode 100644
index 0000000..dc59795
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.task.switchtask;
+
+import org.apache.dolphinscheduler.common.enums.DependentRelation;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SwitchParameters extends AbstractParameters {
+
+ private DependentRelation dependRelation;
+ private String relation;
+ private List<String> nextNode;
+
+ @Override
+ public boolean checkParameters() {
+ return true;
+ }
+
+ @Override
+ public List<ResourceInfo> getResourceFilesList() {
+ return new ArrayList<>();
+ }
+
+ private int resultConditionLocation;
+ private List<SwitchResultVo> dependTaskList;
+
+ public DependentRelation getDependRelation() {
+ return dependRelation;
+ }
+
+ public void setDependRelation(DependentRelation dependRelation) {
+ this.dependRelation = dependRelation;
+ }
+
+ public int getResultConditionLocation() {
+ return resultConditionLocation;
+ }
+
+ public void setResultConditionLocation(int resultConditionLocation) {
+ this.resultConditionLocation = resultConditionLocation;
+ }
+
+ public String getRelation() {
+ return relation;
+ }
+
+ public void setRelation(String relation) {
+ this.relation = relation;
+ }
+
+ public List<SwitchResultVo> getDependTaskList() {
+ return dependTaskList;
+ }
+
+ public void setDependTaskList(List<SwitchResultVo> dependTaskList) {
+ this.dependTaskList = dependTaskList;
+ }
+
+ public List<String> getNextNode() {
+ return nextNode;
+ }
+
+ public void setNextNode(Object nextNode) {
+ if (nextNode instanceof String) {
+ List<String> nextNodeList = new ArrayList<>();
+ nextNodeList.add(String.valueOf(nextNode));
+ this.nextNode = nextNodeList;
+ } else {
+ this.nextNode = (ArrayList) nextNode;
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java
new file mode 100644
index 0000000..558a6f1
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.task.switchtask;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SwitchResultVo {
+
+ private String condition;
+ private List<String> nextNode;
+
+ public String getCondition() {
+ return condition;
+ }
+
+ public void setCondition(String condition) {
+ this.condition = condition;
+ }
+
+ public List<String> getNextNode() {
+ return nextNode;
+ }
+
+ public void setNextNode(Object nextNode) {
+ if (nextNode instanceof String) {
+ List<String> nextNodeList = new ArrayList<>();
+ nextNodeList.add(String.valueOf(nextNode));
+ this.nextNode = nextNodeList;
+ } else {
+ this.nextNode = (ArrayList) nextNode;
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
index 740635c..f5e9dec 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +83,8 @@ public class TaskParametersUtils {
return JSONUtils.parseObject(parameter, ConditionsParameters.class);
case "SQOOP":
return JSONUtils.parseObject(parameter, SqoopParameters.class);
+ case "SWITCH":
+ return JSONUtils.parseObject(parameter, SwitchParameters.class);
default:
logger.error("not support task type: {}", taskType);
return null;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index aa87272..2be4ad6 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
@@ -175,6 +176,12 @@ public class TaskInstance implements Serializable {
private DependentParameters dependency;
/**
+ * switch dependency
+ */
+ @TableField(exist = false)
+ private SwitchParameters switchDependency;
+
+ /**
* duration
*/
@TableField(exist = false)
@@ -426,6 +433,20 @@ public class TaskInstance implements Serializable {
this.dependency = dependency;
}
+ public SwitchParameters getSwitchDependency() {
+ if (this.switchDependency == null) {
+ Map<String, Object> taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class);
+ this.switchDependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.SWITCH_RESULT), SwitchParameters.class);
+ }
+ return this.switchDependency;
+ }
+
+ public void setSwitchDependency(SwitchParameters switchDependency) {
+ Map<String, Object> taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class);
+ taskParamsMap.put(Constants.SWITCH_RESULT,JSONUtils.toJsonString(switchDependency));
+ this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+ }
+
public Flag getFlag() {
return flag;
}
@@ -510,6 +531,10 @@ public class TaskInstance implements Serializable {
return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.taskType);
}
+ public boolean isSwitchTask() {
+ return TaskType.SWITCH.getDesc().equalsIgnoreCase(this.taskType);
+ }
+
/**
* determine if you can try again
*
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index 025b825..de27f17 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.utils;
+package org.apache.dolphinscheduler.dao.utils;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@@ -281,6 +283,9 @@ public class DagHelper {
} else if (dag.getNode(preNodeName).isConditionsTask()) {
List<String> conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
+ } else if (dag.getNode(preNodeName).isSwitchTask()) {
+ List<String> conditionTaskList = parseSwitchTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
+ startVertexes.addAll(conditionTaskList);
} else {
startVertexes = dag.getSubsequentNodes(preNodeName);
}
@@ -356,6 +361,49 @@ public class DagHelper {
}
/**
+ * parse condition task find the branch process
+ * set skip flag for another one.
+ *
+ * @param nodeName
+ * @return
+ */
+ public static List<String> parseSwitchTask(String nodeName,
+ Map<String, TaskNode> skipTaskNodeList,
+ DAG<String, TaskNode, TaskNodeRelation> dag,
+ Map<String, TaskInstance> completeTaskList) {
+ List<String> conditionTaskList = new ArrayList<>();
+ TaskNode taskNode = dag.getNode(nodeName);
+ if (!taskNode.isSwitchTask()) {
+ return conditionTaskList;
+ }
+ if (!completeTaskList.containsKey(nodeName)) {
+ return conditionTaskList;
+ }
+ conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag);
+ return conditionTaskList;
+ }
+
+ private static List<String> skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
+ Map<String, TaskInstance> completeTaskList,
+ DAG<String, TaskNode, TaskNodeRelation> dag) {
+ SwitchParameters switchParameters = completeTaskList.get(taskNode.getName()).getSwitchDependency();
+ int resultConditionLocation = switchParameters.getResultConditionLocation();
+ List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
+ List<String> switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
+ if (CollectionUtils.isEmpty(switchTaskList)) {
+ switchTaskList = new ArrayList<>();
+ }
+ conditionResultVoList.remove(resultConditionLocation);
+ for (SwitchResultVo info : conditionResultVoList) {
+ if (CollectionUtils.isEmpty(info.getNextNode())) {
+ continue;
+ }
+ setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList);
+ }
+ return switchTaskList;
+ }
+
+ /**
* set task node and the post nodes skip flag
*/
private static void setTaskNodeSkip(String skipNodeName,
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
index c486ed9..18c17fe 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -251,6 +253,10 @@ public class DagHelperTest {
skipNodeList.clear();
completeTaskList.remove("3");
taskInstance = new TaskInstance();
+
+ Map<String, Object> taskParamsMap = new HashMap<>();
+ taskParamsMap.put(Constants.SWITCH_RESULT, "");
+ taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
taskInstance.setState(ExecutionStatus.FAILURE);
completeTaskList.put("3", taskInstance);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
@@ -259,6 +265,17 @@ public class DagHelperTest {
Assert.assertEquals(2, skipNodeList.size());
Assert.assertTrue(skipNodeList.containsKey("5"));
Assert.assertTrue(skipNodeList.containsKey("7"));
+
+ // dag: 1-2-3-5-7 4-3-6
+ // 3-if , complete:1/2/3/4
+ // 1.failure:3 expect post:6 skip:5/7
+ dag = generateDag2();
+ skipNodeList.clear();
+ completeTaskList.clear();
+ taskInstance.setSwitchDependency(getSwitchNode());
+ completeTaskList.put("1", taskInstance);
+ postNodes = DagHelper.parsePostNodes("1", skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(1, postNodes.size());
}
/**
@@ -286,7 +303,6 @@ public class DagHelperTest {
node2.setPreTasks(JSONUtils.toJsonString(dep2));
taskNodeList.add(node2);
-
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
@@ -351,6 +367,87 @@ public class DagHelperTest {
return DagHelper.buildDagGraph(processDag);
}
+ /**
+ * 1->2->3->5->7
+ * 4->3->6
+ * 2->8->5->7
+ *
+ * @return dag
+ * @throws JsonProcessingException if error throws JsonProcessingException
+ */
+ private DAG<String, TaskNode, TaskNodeRelation> generateDag2() throws IOException {
+ List<TaskNode> taskNodeList = new ArrayList<>();
+
+ TaskNode node = new TaskNode();
+ node.setId("0");
+ node.setName("0");
+ node.setType("SHELL");
+ taskNodeList.add(node);
+
+ TaskNode node1 = new TaskNode();
+ node1.setId("1");
+ node1.setName("1");
+ node1.setType("switch");
+ node1.setDependence(JSONUtils.toJsonString(getSwitchNode()));
+ taskNodeList.add(node1);
+
+ TaskNode node2 = new TaskNode();
+ node2.setId("2");
+ node2.setName("2");
+ node2.setType("SHELL");
+ List<String> dep2 = new ArrayList<>();
+ dep2.add("1");
+ node2.setPreTasks(JSONUtils.toJsonString(dep2));
+ taskNodeList.add(node2);
+
+ TaskNode node4 = new TaskNode();
+ node4.setId("4");
+ node4.setName("4");
+ node4.setType("SHELL");
+ List<String> dep4 = new ArrayList<>();
+ dep4.add("1");
+ node4.setPreTasks(JSONUtils.toJsonString(dep4));
+ taskNodeList.add(node4);
+
+ TaskNode node5 = new TaskNode();
+ node5.setId("4");
+ node5.setName("4");
+ node5.setType("SHELL");
+ List<String> dep5 = new ArrayList<>();
+ dep5.add("1");
+ node5.setPreTasks(JSONUtils.toJsonString(dep5));
+ taskNodeList.add(node5);
+
+ List<String> startNodes = new ArrayList<>();
+ List<String> recoveryNodes = new ArrayList<>();
+ List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
+ startNodes, recoveryNodes, TaskDependType.TASK_POST);
+ List<TaskNodeRelation> taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
+ ProcessDag processDag = new ProcessDag();
+ processDag.setEdges(taskNodeRelations);
+ processDag.setNodes(destTaskNodeList);
+ return DagHelper.buildDagGraph(processDag);
+ }
+
+ private SwitchParameters getSwitchNode() {
+ SwitchParameters conditionsParameters = new SwitchParameters();
+ SwitchResultVo switchResultVo1 = new SwitchResultVo();
+ switchResultVo1.setCondition(" 2 == 1");
+ switchResultVo1.setNextNode("2");
+ SwitchResultVo switchResultVo2 = new SwitchResultVo();
+ switchResultVo2.setCondition(" 2 == 2");
+ switchResultVo2.setNextNode("4");
+ List<SwitchResultVo> list = new ArrayList<>();
+ list.add(switchResultVo1);
+ list.add(switchResultVo2);
+ conditionsParameters.setDependTaskList(list);
+ conditionsParameters.setNextNode("5");
+ conditionsParameters.setRelation("AND");
+
+ // in: AND(AND(1 is SUCCESS))
+ return conditionsParameters;
+ }
+
@Test
public void testBuildDagGraph() {
String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\","
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index cfd8a9a..da62982 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master.runner;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -201,7 +203,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
try {
if (taskInstance.isConditionsTask()
|| taskInstance.isDependTask()
- || taskInstance.isSubProcess()) {
+ || taskInstance.isSubProcess()
+ || taskInstance.isSwitchTask()
+ ) {
return true;
}
if (taskInstance.getState().typeIsFinished()) {
@@ -321,4 +325,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000;
return timeoutSeconds - usedTime;
}
+
+ protected String getThreadName() {
+ logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+ processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getId()));
+ return String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 1863087..856b833 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -433,6 +433,8 @@ public class MasterExecThread implements Runnable {
abstractExecThread = new DependentTaskExecThread(taskInstance);
} else if (taskInstance.isConditionsTask()) {
abstractExecThread = new ConditionsTaskExecThread(taskInstance);
+ } else if (taskInstance.isSwitchTask()) {
+ abstractExecThread = new SwitchTaskExecThread(taskInstance);
} else {
abstractExecThread = new MasterTaskExecThread(taskInstance);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java
new file mode 100644
index 0000000..f9e7f42
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.enums.DependResult;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class SwitchTaskExecThread extends MasterBaseTaskExecThread {
+
+ protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
+
+ /**
+ * complete task map
+ */
+ private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
+
+ /**
+ * switch result
+ */
+ private DependResult conditionResult;
+
+ /**
+ * constructor of MasterBaseTaskExecThread
+ *
+ * @param taskInstance task instance
+ */
+ public SwitchTaskExecThread(TaskInstance taskInstance) {
+ super(taskInstance);
+ taskInstance.setStartTime(new Date());
+ }
+
+ @Override
+ public Boolean submitWaitComplete() {
+ try {
+ this.taskInstance = submit();
+ logger.info("taskInstance submit end");
+ Thread.currentThread().setName(getThreadName());
+ initTaskParameters();
+ logger.info("switch task start");
+ waitTaskQuit();
+ updateTaskState();
+ } catch (Exception e) {
+ logger.error("switch task run exception", e);
+ }
+ return true;
+ }
+
+ private void waitTaskQuit() {
+ List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
+ taskInstance.getProcessInstanceId()
+ );
+ for (TaskInstance task : taskInstances) {
+ completeTaskList.putIfAbsent(task.getName(), task.getState());
+ }
+
+ SwitchParameters switchParameters = taskInstance.getSwitchDependency();
+ List<SwitchResultVo> switchResultVos = switchParameters.getDependTaskList();
+ SwitchResultVo switchResultVo = new SwitchResultVo();
+ switchResultVo.setNextNode(switchParameters.getNextNode());
+ switchResultVos.add(switchResultVo);
+ int finalConditionLocation = switchResultVos.size() - 1;
+ int i = 0;
+ conditionResult = DependResult.SUCCESS;
+ for (SwitchResultVo info : switchResultVos) {
+ logger.info("the {} execution ", (i + 1));
+ logger.info("original condition sentence:{}", info.getCondition());
+ if (StringUtils.isEmpty(info.getCondition())) {
+ finalConditionLocation = i;
+ break;
+ }
+ String content = setTaskParams(info.getCondition().replaceAll("'", "\""), rgex);
+ logger.info("format condition sentence::{}", content);
+ Boolean result = null;
+ try {
+ result = SwitchTaskUtils.evaluate(content);
+ } catch (Exception e) {
+ logger.info("error sentence : {}", content);
+ conditionResult = DependResult.FAILED;
+ //result = false;
+ break;
+ }
+ logger.info("condition result : {}", result);
+ if (result) {
+ finalConditionLocation = i;
+ break;
+ }
+ i++;
+ }
+ switchParameters.setDependTaskList(switchResultVos);
+ switchParameters.setResultConditionLocation(finalConditionLocation);
+ taskInstance.setSwitchDependency(switchParameters);
+
+ //conditionResult = DependResult.SUCCESS;
+ logger.info("the switch task depend result : {}", conditionResult);
+ }
+
+ /**
+ * update task state
+ */
+ private void updateTaskState() {
+ ExecutionStatus status;
+ if (this.cancel) {
+ status = ExecutionStatus.KILL;
+ } else {
+ status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
+ }
+ taskInstance.setEndTime(new Date());
+ taskInstance.setState(status);
+ processService.updateTaskInstance(taskInstance);
+ }
+
+ private void initTaskParameters() {
+ taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getId()));
+ this.taskInstance.setStartTime(new Date());
+ this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
+ this.taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ this.processService.saveTaskInstance(taskInstance);
+ }
+
+ public String setTaskParams(String content, String rgex) {
+ Pattern pattern = Pattern.compile(rgex);
+ Matcher m = pattern.matcher(content);
+ Map<String, Property> globalParams = JSONUtils.toList(processInstance.getGlobalParams(), Property.class).stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
+ Map<String, Property> varParams = JSONUtils.toList(taskInstance.getVarPool(), Property.class).stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
+ if (varParams.size() > 0) {
+ varParams.putAll(globalParams);
+ globalParams = varParams;
+ }
+ while (m.find()) {
+ String paramName = m.group(1);
+ Property property = globalParams.get(paramName);
+ if (property == null) {
+ return "";
+ }
+ String value = property.getValue();
+ if (!org.apache.commons.lang.math.NumberUtils.isNumber(value)) {
+ value = "\"" + value + "\"";
+ }
+ logger.info("paramName:{},paramValue{}", paramName, value);
+ content = content.replace("${" + paramName + "}", value);
+ }
+ return content;
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java
new file mode 100644
index 0000000..6320feb
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.utils;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+
+public class SwitchTaskUtils {
+ private static ScriptEngineManager manager;
+ private static ScriptEngine engine;
+
+ static {
+ manager = new ScriptEngineManager();
+ engine = manager.getEngineByName("js");
+ }
+
+ public static boolean evaluate(String expression) throws ScriptException {
+ Object result = engine.eval(expression);
+ return (Boolean) result;
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
new file mode 100644
index 0000000..0c2d74a
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.SwitchTaskExecThread;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
+
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class SwitchTaskTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(SwitchTaskTest.class);
+
+ /**
+ * TaskNode.runFlag : task can be run normally
+ */
+ public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
+
+ private ProcessService processService;
+
+ private ProcessInstance processInstance;
+
+ @Before
+ public void before() {
+ ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
+ SpringApplicationContext springApplicationContext = new SpringApplicationContext();
+ springApplicationContext.setApplicationContext(applicationContext);
+
+ MasterConfig config = new MasterConfig();
+ Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
+ config.setMasterTaskCommitRetryTimes(3);
+ config.setMasterTaskCommitInterval(1000);
+
+ processService = Mockito.mock(ProcessService.class);
+ Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+
+ processInstance = getProcessInstance();
+ Mockito.when(processService
+ .findProcessInstanceById(processInstance.getId()))
+ .thenReturn(processInstance);
+ }
+
+ private TaskInstance testBasicInit(ExecutionStatus expectResult) {
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
+ taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
+ taskDefinition.setTimeout(0);
+ Mockito.when(processService.findTaskDefinition(1L, 1))
+ .thenReturn(taskDefinition);
+ TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance);
+
+ // for MasterBaseTaskExecThread.submit
+ Mockito.when(processService
+ .submitTask(taskInstance))
+ .thenReturn(taskInstance);
+ // for MasterBaseTaskExecThread.call
+ Mockito.when(processService
+ .findTaskInstanceById(taskInstance.getId()))
+ .thenReturn(taskInstance);
+ // for SwitchTaskExecThread.initTaskParameters
+ Mockito.when(processService
+ .saveTaskInstance(taskInstance))
+ .thenReturn(true);
+ // for SwitchTaskExecThread.updateTaskState
+ Mockito.when(processService
+ .updateTaskInstance(taskInstance))
+ .thenReturn(true);
+
+ return taskInstance;
+ }
+
+ @Test
+ public void testExe() throws Exception {
+ TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS);
+ taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+ SwitchTaskExecThread taskExecThread = new SwitchTaskExecThread(taskInstance);
+ taskExecThread.call();
+ Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
+ }
+
+ private SwitchParameters getTaskNode() {
+ SwitchParameters conditionsParameters = new SwitchParameters();
+
+ SwitchResultVo switchResultVo1 = new SwitchResultVo();
+ switchResultVo1.setCondition(" 2 == 1");
+ switchResultVo1.setNextNode("t1");
+ SwitchResultVo switchResultVo2 = new SwitchResultVo();
+ switchResultVo2.setCondition(" 2 == 2");
+ switchResultVo2.setNextNode("t2");
+ SwitchResultVo switchResultVo3 = new SwitchResultVo();
+ switchResultVo3.setCondition(" 3 == 2");
+ switchResultVo3.setNextNode("t3");
+ List<SwitchResultVo> list = new ArrayList<>();
+ list.add(switchResultVo1);
+ list.add(switchResultVo2);
+ list.add(switchResultVo3);
+ conditionsParameters.setDependTaskList(list);
+ conditionsParameters.setNextNode("t");
+ conditionsParameters.setRelation("AND");
+
+ return conditionsParameters;
+ }
+
+ private ProcessInstance getProcessInstance() {
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(1000);
+ processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ processInstance.setProcessDefinitionCode(1L);
+ return processInstance;
+ }
+
+ private TaskInstance getTaskInstance(SwitchParameters conditionsParameters, ProcessInstance processInstance) {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1000);
+ Map<String, Object> taskParamsMap = new HashMap<>();
+ taskParamsMap.put(Constants.SWITCH_RESULT, "");
+ taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+ taskInstance.setSwitchDependency(conditionsParameters);
+ taskInstance.setName("C");
+ taskInstance.setTaskType("SWITCH");
+ taskInstance.setProcessInstanceId(processInstance.getId());
+ taskInstance.setTaskCode(1L);
+ taskInstance.setTaskDefinitionVersion(1);
+ return taskInstance;
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index f7b5de3..ac3e78d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2458,6 +2458,7 @@ public class ProcessService {
v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
Map<String, Object> taskParamsMap = v.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
v.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
+ v.setSwitchResult((String) taskParamsMap.get(Constants.SWITCH_RESULT));
v.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE));
taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE);
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 643dc09..d0a7351 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -424,12 +424,14 @@ public class ProcessServiceTest {
@Test
public void testGenProcessData() {
- String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\",\"desc\":null,"
- + "\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":0,"
- + "\"params\":{},\"preTasks\":[\"unit-test\"],\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\","
- + "\"version\":0}],\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null,"
- + "\"taskInstancePriority\":null,\"workerGroup\":null,\"timeout\":{\"enable\":false,\"strategy\":null,"
- + "\"interval\":0},\"delayTime\":0}],\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
+ String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\","
+ + "\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,"
+ + "\"retryInterval\":0,\"params\":{},\"preTasks\":[\"unit-test\"],"
+ + "\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\",\"version\":0}],"
+ + "\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null,"
+ + "\"switchResult\":null,\"taskInstancePriority\":null,\"workerGroup\":null,"
+ + "\"timeout\":{\"enable\":false,\"strategy\":null,\"interval\":0},\"delayTime\":0}],"
+ + "\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
diff --git a/pom.xml b/pom.xml
index 705a54b..522d9b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -992,6 +992,7 @@
<include>**/server/master/MasterCommandTest.java</include>
<include>**/server/master/DependentTaskTest.java</include>
<include>**/server/master/ConditionsTaskTest.java</include>
+ <include>**/server/master/SwitchTaskTest.java</include>
<include>**/server/master/MasterExecThreadTest.java</include>
<include>**/server/master/ParamsTest.java</include>
<include>**/server/master/SubProcessTaskTest.java</include>