You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/08/23 16:35:28 UTC

[dolphinscheduler] branch dev updated: [Feature-#5273][server-master] Task node of SWITCH (#5922)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2e1768a  [Feature-#5273][server-master] Task node of SWITCH (#5922)
2e1768a is described below

commit 2e1768aae6100bb05804c29c974bfaa274e2636f
Author: wangxj3 <85...@qq.com>
AuthorDate: Tue Aug 24 00:35:18 2021 +0800

    [Feature-#5273][server-master] Task node of SWITCH (#5922)
    
    
    
    Co-authored-by: wangxj <wangxj31>
---
 .../dolphinscheduler/api/utils/CheckUtils.java     |  45 ++----
 .../apache/dolphinscheduler/common/Constants.java  |   1 +
 .../dolphinscheduler/common/enums/TaskType.java    |   4 +-
 .../dolphinscheduler/common/model/TaskNode.java    |  19 ++-
 .../common/task/switchtask/SwitchParameters.java   |  91 +++++++++++
 .../common/task/switchtask/SwitchResultVo.java     |  49 ++++++
 .../common/utils/TaskParametersUtils.java          |   3 +
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  25 +++
 .../dolphinscheduler/dao/utils/DagHelper.java      |  50 +++++-
 .../dolphinscheduler/dao/utils/DagHelperTest.java  |  99 +++++++++++-
 .../master/runner/MasterBaseTaskExecThread.java    |  15 +-
 .../server/master/runner/MasterExecThread.java     |   2 +
 .../server/master/runner/SwitchTaskExecThread.java | 180 +++++++++++++++++++++
 .../server/utils/SwitchTaskUtils.java              |  38 +++++
 .../server/master/SwitchTaskTest.java              | 167 +++++++++++++++++++
 .../service/process/ProcessService.java            |   1 +
 .../service/process/ProcessServiceTest.java        |  14 +-
 pom.xml                                            |   1 +
 18 files changed, 765 insertions(+), 39 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
index aca9771..ad2f574 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
@@ -43,8 +43,7 @@ public class CheckUtils {
     /**
      * check username
      *
-     * @param userName
-     *            user name
+     * @param userName user name
      * @return true if user name regex valid,otherwise return false
      */
     public static boolean checkUserName(String userName) {
@@ -54,8 +53,7 @@ public class CheckUtils {
     /**
      * check email
      *
-     * @param email
-     *            email
+     * @param email email
      * @return true if email regex valid, otherwise return false
      */
     public static boolean checkEmail(String email) {
@@ -69,8 +67,7 @@ public class CheckUtils {
     /**
      * check project description
      *
-     * @param desc
-     *            desc
+     * @param desc desc
      * @return true if description regex valid, otherwise return false
      */
     public static Map<String, Object> checkDesc(String desc) {
@@ -78,7 +75,7 @@ public class CheckUtils {
         if (StringUtils.isNotEmpty(desc) && desc.length() > 200) {
             result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
             result.put(Constants.MSG,
-                MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
+                    MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
         } else {
             result.put(Constants.STATUS, Status.SUCCESS);
         }
@@ -88,8 +85,7 @@ public class CheckUtils {
     /**
      * check extra info
      *
-     * @param otherParams
-     *            other parames
+     * @param otherParams other parames
      * @return true if other parameters are valid, otherwise return false
      */
     public static boolean checkOtherParams(String otherParams) {
@@ -99,8 +95,7 @@ public class CheckUtils {
     /**
      * check password
      *
-     * @param password
-     *            password
+     * @param password password
      * @return true if password regex valid, otherwise return false
      */
     public static boolean checkPassword(String password) {
@@ -110,8 +105,7 @@ public class CheckUtils {
     /**
      * check phone phone can be empty.
      *
-     * @param phone
-     *            phone
+     * @param phone phone
      * @return true if phone regex valid, otherwise return false
      */
     public static boolean checkPhone(String phone) {
@@ -121,8 +115,7 @@ public class CheckUtils {
     /**
      * check task node parameter
      *
-     * @param taskNode
-     *            TaskNode
+     * @param taskNode TaskNode
      * @return true if task node parameters are valid, otherwise return false
      */
     public static boolean checkTaskNodeParameters(TaskNode taskNode) {
@@ -133,6 +126,8 @@ public class CheckUtils {
         }
         if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) {
             abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getDependence());
+        } else if (TaskType.SWITCH.getDesc().equalsIgnoreCase(taskType)) {
+            abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getSwitchResult());
         } else {
             abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getParams());
         }
@@ -147,28 +142,22 @@ public class CheckUtils {
     /**
      * check params
      *
-     * @param userName
-     *            user name
-     * @param password
-     *            password
-     * @param email
-     *            email
-     * @param phone
-     *            phone
+     * @param userName user name
+     * @param password password
+     * @param email    email
+     * @param phone    phone
      * @return true if user parameters are valid, other return false
      */
     public static boolean checkUserParams(String userName, String password, String email, String phone) {
         return CheckUtils.checkUserName(userName) && CheckUtils.checkEmail(email) && CheckUtils.checkPassword(password)
-               && CheckUtils.checkPhone(phone);
+                && CheckUtils.checkPhone(phone);
     }
 
     /**
      * regex check
      *
-     * @param str
-     *            input string
-     * @param pattern
-     *            regex pattern
+     * @param str     input string
+     * @param pattern regex pattern
      * @return true if regex pattern is right, otherwise return false
      */
     private static boolean regexChecks(String str, Pattern pattern) {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 53645b7..e2b8a0c 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -776,6 +776,7 @@ public final class Constants {
     public static final String PROCESS_INSTANCE_STATE = "processInstanceState";
     public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
     public static final String CONDITION_RESULT = "conditionResult";
+    public static final String SWITCH_RESULT = "switchResult";
     public static final String DEPENDENCE = "dependence";
     public static final String TASK_TYPE = "taskType";
     public static final String TASK_LIST = "taskList";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
index d0842e4..3792368 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
@@ -51,7 +51,9 @@ public enum TaskType {
     DATAX(10, "DATAX"),
     CONDITIONS(11, "CONDITIONS"),
     SQOOP(12, "SQOOP"),
-    WATERDROP(13, "WATERDROP");
+    WATERDROP(13, "WATERDROP"),
+    SWITCH(14, "SWITCH"),
+    ;
 
     TaskType(int code, String desc) {
         this.code = code;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index b9c5a28..2e9262d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.model;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -33,7 +34,6 @@ import java.util.Objects;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-
 public class TaskNode {
 
     /**
@@ -129,6 +129,10 @@ public class TaskNode {
     @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
     private String conditionResult;
 
+    @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
+    @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
+    private String switchResult;
+
     /**
      * task instance priority
      */
@@ -365,6 +369,10 @@ public class TaskNode {
         return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.getType());
     }
 
+    public boolean isSwitchTask() {
+        return TaskType.SWITCH.toString().equalsIgnoreCase(this.getType());
+    }
+
     public List<PreviousTaskNode> getPreTaskNodeList() {
         return preTaskNodeList;
     }
@@ -380,6 +388,7 @@ public class TaskNode {
         }
         taskParams.put(Constants.CONDITION_RESULT, this.conditionResult);
         taskParams.put(Constants.DEPENDENCE, this.dependence);
+        taskParams.put(Constants.SWITCH_RESULT, this.switchResult);
         return JSONUtils.toJsonString(taskParams);
     }
 
@@ -417,4 +426,12 @@ public class TaskNode {
                 + ", delayTime=" + delayTime
                 + '}';
     }
+
+    public String getSwitchResult() {
+        return switchResult;
+    }
+
+    public void setSwitchResult(String switchResult) {
+        this.switchResult = switchResult;
+    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java
new file mode 100644
index 0000000..dc59795
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchParameters.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.task.switchtask;
+
+import org.apache.dolphinscheduler.common.enums.DependentRelation;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SwitchParameters extends AbstractParameters {
+
+    private DependentRelation dependRelation;
+    private String relation;
+    private List<String> nextNode;
+
+    @Override
+    public boolean checkParameters() {
+        return true;
+    }
+
+    @Override
+    public List<ResourceInfo> getResourceFilesList() {
+        return new ArrayList<>();
+    }
+
+    private int resultConditionLocation;
+    private List<SwitchResultVo> dependTaskList;
+
+    public DependentRelation getDependRelation() {
+        return dependRelation;
+    }
+
+    public void setDependRelation(DependentRelation dependRelation) {
+        this.dependRelation = dependRelation;
+    }
+
+    public int getResultConditionLocation() {
+        return resultConditionLocation;
+    }
+
+    public void setResultConditionLocation(int resultConditionLocation) {
+        this.resultConditionLocation = resultConditionLocation;
+    }
+
+    public String getRelation() {
+        return relation;
+    }
+
+    public void setRelation(String relation) {
+        this.relation = relation;
+    }
+
+    public List<SwitchResultVo> getDependTaskList() {
+        return dependTaskList;
+    }
+
+    public void setDependTaskList(List<SwitchResultVo> dependTaskList) {
+        this.dependTaskList = dependTaskList;
+    }
+
+    public List<String> getNextNode() {
+        return nextNode;
+    }
+
+    public void setNextNode(Object nextNode) {
+        if (nextNode instanceof String) {
+            List<String> nextNodeList = new ArrayList<>();
+            nextNodeList.add(String.valueOf(nextNode));
+            this.nextNode = nextNodeList;
+        } else {
+            this.nextNode = (ArrayList) nextNode;
+        }
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java
new file mode 100644
index 0000000..558a6f1
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/switchtask/SwitchResultVo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.task.switchtask;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SwitchResultVo {
+
+    private String condition;
+    private List<String> nextNode;
+
+    public String getCondition() {
+        return condition;
+    }
+
+    public void setCondition(String condition) {
+        this.condition = condition;
+    }
+
+    public List<String> getNextNode() {
+        return nextNode;
+    }
+
+    public void setNextNode(Object nextNode) {
+        if (nextNode instanceof String) {
+            List<String> nextNodeList = new ArrayList<>();
+            nextNodeList.add(String.valueOf(nextNode));
+            this.nextNode = nextNodeList;
+        } else {
+            this.nextNode = (ArrayList) nextNode;
+        }
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
index 740635c..f5e9dec 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
 import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,6 +83,8 @@ public class TaskParametersUtils {
                 return JSONUtils.parseObject(parameter, ConditionsParameters.class);
             case "SQOOP":
                 return JSONUtils.parseObject(parameter, SqoopParameters.class);
+            case "SWITCH":
+                return JSONUtils.parseObject(parameter, SwitchParameters.class);
             default:
                 logger.error("not support task type: {}", taskType);
                 return null;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index aa87272..2be4ad6 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
@@ -175,6 +176,12 @@ public class TaskInstance implements Serializable {
     private DependentParameters dependency;
 
     /**
+     * switch dependency
+     */
+    @TableField(exist = false)
+    private SwitchParameters switchDependency;
+
+    /**
      * duration
      */
     @TableField(exist = false)
@@ -426,6 +433,20 @@ public class TaskInstance implements Serializable {
         this.dependency = dependency;
     }
 
+    public SwitchParameters getSwitchDependency() {
+        if (this.switchDependency == null) {
+            Map<String, Object> taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class);
+            this.switchDependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.SWITCH_RESULT), SwitchParameters.class);
+        }
+        return this.switchDependency;
+    }
+
+    public void setSwitchDependency(SwitchParameters switchDependency) {
+        Map<String, Object> taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class);
+        taskParamsMap.put(Constants.SWITCH_RESULT,JSONUtils.toJsonString(switchDependency));
+        this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+    }
+
     public Flag getFlag() {
         return flag;
     }
@@ -510,6 +531,10 @@ public class TaskInstance implements Serializable {
         return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.taskType);
     }
 
+    public boolean isSwitchTask() {
+        return TaskType.SWITCH.getDesc().equalsIgnoreCase(this.taskType);
+    }
+
     /**
      * determine if you can try again
      *
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index 025b825..de27f17 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.dao.utils;
 
+package org.apache.dolphinscheduler.dao.utils;
 
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.graph.DAG;
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;
 import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
@@ -281,6 +283,9 @@ public class DagHelper {
         } else if (dag.getNode(preNodeName).isConditionsTask()) {
             List<String> conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
             startVertexes.addAll(conditionTaskList);
+        } else if (dag.getNode(preNodeName).isSwitchTask()) {
+            List<String> conditionTaskList = parseSwitchTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
+            startVertexes.addAll(conditionTaskList);
         } else {
             startVertexes = dag.getSubsequentNodes(preNodeName);
         }
@@ -356,6 +361,49 @@ public class DagHelper {
     }
 
     /**
+     * parse condition task find the branch process
+     * set skip flag for another one.
+     *
+     * @param nodeName
+     * @return
+     */
+    public static List<String> parseSwitchTask(String nodeName,
+                                               Map<String, TaskNode> skipTaskNodeList,
+                                               DAG<String, TaskNode, TaskNodeRelation> dag,
+                                               Map<String, TaskInstance> completeTaskList) {
+        List<String> conditionTaskList = new ArrayList<>();
+        TaskNode taskNode = dag.getNode(nodeName);
+        if (!taskNode.isSwitchTask()) {
+            return conditionTaskList;
+        }
+        if (!completeTaskList.containsKey(nodeName)) {
+            return conditionTaskList;
+        }
+        conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag);
+        return conditionTaskList;
+    }
+
+    private static List<String> skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
+                                                    Map<String, TaskInstance> completeTaskList,
+                                                    DAG<String, TaskNode, TaskNodeRelation> dag) {
+        SwitchParameters switchParameters = completeTaskList.get(taskNode.getName()).getSwitchDependency();
+        int resultConditionLocation = switchParameters.getResultConditionLocation();
+        List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
+        List<String> switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
+        if (CollectionUtils.isEmpty(switchTaskList)) {
+            switchTaskList = new ArrayList<>();
+        }
+        conditionResultVoList.remove(resultConditionLocation);
+        for (SwitchResultVo info : conditionResultVoList) {
+            if (CollectionUtils.isEmpty(info.getNextNode())) {
+                continue;
+            }
+            setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList);
+        }
+        return switchTaskList;
+    }
+
+    /**
      * set task node and the post nodes skip flag
      */
     private static void setTaskNodeSkip(String skipNodeName,
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
index c486ed9..18c17fe 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -251,6 +253,10 @@ public class DagHelperTest {
         skipNodeList.clear();
         completeTaskList.remove("3");
         taskInstance = new TaskInstance();
+
+        Map<String, Object> taskParamsMap = new HashMap<>();
+        taskParamsMap.put(Constants.SWITCH_RESULT, "");
+        taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
         taskInstance.setState(ExecutionStatus.FAILURE);
         completeTaskList.put("3", taskInstance);
         postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
@@ -259,6 +265,17 @@ public class DagHelperTest {
         Assert.assertEquals(2, skipNodeList.size());
         Assert.assertTrue(skipNodeList.containsKey("5"));
         Assert.assertTrue(skipNodeList.containsKey("7"));
+
+        // dag: 1-2-3-5-7 4-3-6
+        // 3-if , complete:1/2/3/4
+        // 1.failure:3 expect post:6 skip:5/7
+        dag = generateDag2();
+        skipNodeList.clear();
+        completeTaskList.clear();
+        taskInstance.setSwitchDependency(getSwitchNode());
+        completeTaskList.put("1", taskInstance);
+        postNodes = DagHelper.parsePostNodes("1", skipNodeList, dag, completeTaskList);
+        Assert.assertEquals(1, postNodes.size());
     }
 
     /**
@@ -286,7 +303,6 @@ public class DagHelperTest {
         node2.setPreTasks(JSONUtils.toJsonString(dep2));
         taskNodeList.add(node2);
 
-
         TaskNode node4 = new TaskNode();
         node4.setId("4");
         node4.setName("4");
@@ -351,6 +367,87 @@ public class DagHelperTest {
         return DagHelper.buildDagGraph(processDag);
     }
 
+    /**
+     * 1->2->3->5->7
+     * 4->3->6
+     * 2->8->5->7
+     *
+     * @return dag
+     * @throws JsonProcessingException if error throws JsonProcessingException
+     */
+    private DAG<String, TaskNode, TaskNodeRelation> generateDag2() throws IOException {
+        List<TaskNode> taskNodeList = new ArrayList<>();
+
+        TaskNode node = new TaskNode();
+        node.setId("0");
+        node.setName("0");
+        node.setType("SHELL");
+        taskNodeList.add(node);
+
+        TaskNode node1 = new TaskNode();
+        node1.setId("1");
+        node1.setName("1");
+        node1.setType("switch");
+        node1.setDependence(JSONUtils.toJsonString(getSwitchNode()));
+        taskNodeList.add(node1);
+
+        TaskNode node2 = new TaskNode();
+        node2.setId("2");
+        node2.setName("2");
+        node2.setType("SHELL");
+        List<String> dep2 = new ArrayList<>();
+        dep2.add("1");
+        node2.setPreTasks(JSONUtils.toJsonString(dep2));
+        taskNodeList.add(node2);
+
+        TaskNode node4 = new TaskNode();
+        node4.setId("4");
+        node4.setName("4");
+        node4.setType("SHELL");
+        List<String> dep4 = new ArrayList<>();
+        dep4.add("1");
+        node4.setPreTasks(JSONUtils.toJsonString(dep4));
+        taskNodeList.add(node4);
+
+        TaskNode node5 = new TaskNode();
+        node5.setId("4");
+        node5.setName("4");
+        node5.setType("SHELL");
+        List<String> dep5 = new ArrayList<>();
+        dep5.add("1");
+        node5.setPreTasks(JSONUtils.toJsonString(dep5));
+        taskNodeList.add(node5);
+
+        List<String> startNodes = new ArrayList<>();
+        List<String> recoveryNodes = new ArrayList<>();
+        List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
+                startNodes, recoveryNodes, TaskDependType.TASK_POST);
+        List<TaskNodeRelation> taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
+        ProcessDag processDag = new ProcessDag();
+        processDag.setEdges(taskNodeRelations);
+        processDag.setNodes(destTaskNodeList);
+        return DagHelper.buildDagGraph(processDag);
+    }
+
+    private SwitchParameters getSwitchNode() {
+        SwitchParameters conditionsParameters = new SwitchParameters();
+        SwitchResultVo switchResultVo1 = new SwitchResultVo();
+        switchResultVo1.setCondition(" 2 == 1");
+        switchResultVo1.setNextNode("2");
+        SwitchResultVo switchResultVo2 = new SwitchResultVo();
+        switchResultVo2.setCondition(" 2 == 2");
+        switchResultVo2.setNextNode("4");
+        List<SwitchResultVo> list = new ArrayList<>();
+        list.add(switchResultVo1);
+        list.add(switchResultVo2);
+        conditionsParameters.setDependTaskList(list);
+        conditionsParameters.setNextNode("5");
+        conditionsParameters.setRelation("AND");
+
+        // in: AND(AND(1 is SUCCESS))
+        return conditionsParameters;
+    }
+
     @Test
     public void testBuildDagGraph() {
         String shellJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\","
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index cfd8a9a..da62982 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -17,11 +17,13 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -201,7 +203,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         try {
             if (taskInstance.isConditionsTask()
                     || taskInstance.isDependTask()
-                    || taskInstance.isSubProcess()) {
+                    || taskInstance.isSubProcess()
+                    || taskInstance.isSwitchTask()
+            ) {
                 return true;
             }
             if (taskInstance.getState().typeIsFinished()) {
@@ -321,4 +325,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000;
         return timeoutSeconds - usedTime;
     }
+
+    protected String getThreadName() {
+        logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+                processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion(),
+                taskInstance.getProcessInstanceId(),
+                taskInstance.getId()));
+        return String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 1863087..856b833 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -433,6 +433,8 @@ public class MasterExecThread implements Runnable {
             abstractExecThread = new DependentTaskExecThread(taskInstance);
         } else if (taskInstance.isConditionsTask()) {
             abstractExecThread = new ConditionsTaskExecThread(taskInstance);
+        } else if (taskInstance.isSwitchTask()) {
+            abstractExecThread = new SwitchTaskExecThread(taskInstance);
         } else {
             abstractExecThread = new MasterTaskExecThread(taskInstance);
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java
new file mode 100644
index 0000000..f9e7f42
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SwitchTaskExecThread.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.enums.DependResult;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.utils.LogUtils;
+import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class SwitchTaskExecThread extends MasterBaseTaskExecThread {
+
+    protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
+
+    /**
+     * complete task map
+     */
+    private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
+
+    /**
+     * switch result
+     */
+    private DependResult conditionResult;
+
+    /**
+     * constructor of MasterBaseTaskExecThread
+     *
+     * @param taskInstance task instance
+     */
+    public SwitchTaskExecThread(TaskInstance taskInstance) {
+        super(taskInstance);
+        taskInstance.setStartTime(new Date());
+    }
+
+    @Override
+    public Boolean submitWaitComplete() {
+        try {
+            this.taskInstance = submit();
+            logger.info("taskInstance submit end");
+            Thread.currentThread().setName(getThreadName());
+            initTaskParameters();
+            logger.info("switch task start");
+            waitTaskQuit();
+            updateTaskState();
+        } catch (Exception e) {
+            logger.error("switch task run exception", e);
+        }
+        return true;
+    }
+
+    private void waitTaskQuit() {
+        List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
+                taskInstance.getProcessInstanceId()
+        );
+        for (TaskInstance task : taskInstances) {
+            completeTaskList.putIfAbsent(task.getName(), task.getState());
+        }
+
+        SwitchParameters switchParameters = taskInstance.getSwitchDependency();
+        List<SwitchResultVo> switchResultVos = switchParameters.getDependTaskList();
+        SwitchResultVo switchResultVo = new SwitchResultVo();
+        switchResultVo.setNextNode(switchParameters.getNextNode());
+        switchResultVos.add(switchResultVo);
+        int finalConditionLocation = switchResultVos.size() - 1;
+        int i = 0;
+        conditionResult = DependResult.SUCCESS;
+        for (SwitchResultVo info : switchResultVos) {
+            logger.info("the {} execution ", (i + 1));
+            logger.info("original condition sentence:{}", info.getCondition());
+            if (StringUtils.isEmpty(info.getCondition())) {
+                finalConditionLocation = i;
+                break;
+            }
+            String content = setTaskParams(info.getCondition().replaceAll("'", "\""), rgex);
+            logger.info("format condition sentence::{}", content);
+            Boolean result = null;
+            try {
+                result = SwitchTaskUtils.evaluate(content);
+            } catch (Exception e) {
+                logger.info("error sentence : {}", content);
+                conditionResult = DependResult.FAILED;
+                //result = false;
+                break;
+            }
+            logger.info("condition result : {}", result);
+            if (result) {
+                finalConditionLocation = i;
+                break;
+            }
+            i++;
+        }
+        switchParameters.setDependTaskList(switchResultVos);
+        switchParameters.setResultConditionLocation(finalConditionLocation);
+        taskInstance.setSwitchDependency(switchParameters);
+
+        //conditionResult = DependResult.SUCCESS;
+        logger.info("the switch task depend result : {}", conditionResult);
+    }
+
+    /**
+     * update task state
+     */
+    private void updateTaskState() {
+        ExecutionStatus status;
+        if (this.cancel) {
+            status = ExecutionStatus.KILL;
+        } else {
+            status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
+        }
+        taskInstance.setEndTime(new Date());
+        taskInstance.setState(status);
+        processService.updateTaskInstance(taskInstance);
+    }
+
+    private void initTaskParameters() {
+        taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion(),
+                taskInstance.getProcessInstanceId(),
+                taskInstance.getId()));
+        this.taskInstance.setStartTime(new Date());
+        this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
+        this.taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        this.processService.saveTaskInstance(taskInstance);
+    }
+
+    public String setTaskParams(String content, String rgex) {
+        Pattern pattern = Pattern.compile(rgex);
+        Matcher m = pattern.matcher(content);
+        Map<String, Property> globalParams = JSONUtils.toList(processInstance.getGlobalParams(), Property.class).stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
+        Map<String, Property> varParams = JSONUtils.toList(taskInstance.getVarPool(), Property.class).stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
+        if (varParams.size() > 0) {
+            varParams.putAll(globalParams);
+            globalParams = varParams;
+        }
+        while (m.find()) {
+            String paramName = m.group(1);
+            Property property = globalParams.get(paramName);
+            if (property == null) {
+                return "";
+            }
+            String value = property.getValue();
+            if (!org.apache.commons.lang.math.NumberUtils.isNumber(value)) {
+                value = "\"" + value + "\"";
+            }
+            logger.info("paramName:{},paramValue{}", paramName, value);
+            content = content.replace("${" + paramName + "}", value);
+        }
+        return content;
+    }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java
new file mode 100644
index 0000000..6320feb
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SwitchTaskUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.utils;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+
+public class SwitchTaskUtils {
+    private static ScriptEngineManager manager;
+    private static ScriptEngine engine;
+
+    static {
+        manager = new ScriptEngineManager();
+        engine = manager.getEngineByName("js");
+    }
+
+    public static boolean evaluate(String expression) throws ScriptException {
+        Object result = engine.eval(expression);
+        return (Boolean) result;
+    }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
new file mode 100644
index 0000000..0c2d74a
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
+import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.SwitchTaskExecThread;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
+
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class SwitchTaskTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(SwitchTaskTest.class);
+
+    /**
+     * TaskNode.runFlag : task can be run normally
+     */
+    public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
+
+    private ProcessService processService;
+
+    private ProcessInstance processInstance;
+
+    @Before
+    public void before() {
+        ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
+        SpringApplicationContext springApplicationContext = new SpringApplicationContext();
+        springApplicationContext.setApplicationContext(applicationContext);
+
+        MasterConfig config = new MasterConfig();
+        Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
+        config.setMasterTaskCommitRetryTimes(3);
+        config.setMasterTaskCommitInterval(1000);
+
+        processService = Mockito.mock(ProcessService.class);
+        Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+
+        processInstance = getProcessInstance();
+        Mockito.when(processService
+                .findProcessInstanceById(processInstance.getId()))
+                .thenReturn(processInstance);
+    }
+
+    private TaskInstance testBasicInit(ExecutionStatus expectResult) {
+        TaskDefinition taskDefinition = new TaskDefinition();
+        taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
+        taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
+        taskDefinition.setTimeout(0);
+        Mockito.when(processService.findTaskDefinition(1L, 1))
+                .thenReturn(taskDefinition);
+        TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance);
+
+        // for MasterBaseTaskExecThread.submit
+        Mockito.when(processService
+                .submitTask(taskInstance))
+                .thenReturn(taskInstance);
+        // for MasterBaseTaskExecThread.call
+        Mockito.when(processService
+                .findTaskInstanceById(taskInstance.getId()))
+                .thenReturn(taskInstance);
+        // for SwitchTaskExecThread.initTaskParameters
+        Mockito.when(processService
+                .saveTaskInstance(taskInstance))
+                .thenReturn(true);
+        // for SwitchTaskExecThread.updateTaskState
+        Mockito.when(processService
+                .updateTaskInstance(taskInstance))
+                .thenReturn(true);
+
+        return taskInstance;
+    }
+
+    @Test
+    public void testExe() throws Exception {
+        TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS);
+        taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+        SwitchTaskExecThread taskExecThread = new SwitchTaskExecThread(taskInstance);
+        taskExecThread.call();
+        Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
+    }
+
+    private SwitchParameters getTaskNode() {
+        SwitchParameters conditionsParameters = new SwitchParameters();
+
+        SwitchResultVo switchResultVo1 = new SwitchResultVo();
+        switchResultVo1.setCondition(" 2 == 1");
+        switchResultVo1.setNextNode("t1");
+        SwitchResultVo switchResultVo2 = new SwitchResultVo();
+        switchResultVo2.setCondition(" 2 == 2");
+        switchResultVo2.setNextNode("t2");
+        SwitchResultVo switchResultVo3 = new SwitchResultVo();
+        switchResultVo3.setCondition(" 3 == 2");
+        switchResultVo3.setNextNode("t3");
+        List<SwitchResultVo> list = new ArrayList<>();
+        list.add(switchResultVo1);
+        list.add(switchResultVo2);
+        list.add(switchResultVo3);
+        conditionsParameters.setDependTaskList(list);
+        conditionsParameters.setNextNode("t");
+        conditionsParameters.setRelation("AND");
+
+        return conditionsParameters;
+    }
+
+    private ProcessInstance getProcessInstance() {
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(1000);
+        processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        processInstance.setProcessDefinitionCode(1L);
+        return processInstance;
+    }
+
+    private TaskInstance getTaskInstance(SwitchParameters conditionsParameters, ProcessInstance processInstance) {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1000);
+        Map<String, Object> taskParamsMap = new HashMap<>();
+        taskParamsMap.put(Constants.SWITCH_RESULT, "");
+        taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
+        taskInstance.setSwitchDependency(conditionsParameters);
+        taskInstance.setName("C");
+        taskInstance.setTaskType("SWITCH");
+        taskInstance.setProcessInstanceId(processInstance.getId());
+        taskInstance.setTaskCode(1L);
+        taskInstance.setTaskDefinitionVersion(1);
+        return taskInstance;
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index f7b5de3..ac3e78d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2458,6 +2458,7 @@ public class ProcessService {
             v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
             Map<String, Object> taskParamsMap = v.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
             v.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
+            v.setSwitchResult((String) taskParamsMap.get(Constants.SWITCH_RESULT));
             v.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE));
             taskParamsMap.remove(Constants.CONDITION_RESULT);
             taskParamsMap.remove(Constants.DEPENDENCE);
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 643dc09..d0a7351 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -424,12 +424,14 @@ public class ProcessServiceTest {
 
     @Test
     public void testGenProcessData() {
-        String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\",\"desc\":null,"
-                + "\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":0,"
-                + "\"params\":{},\"preTasks\":[\"unit-test\"],\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\","
-                + "\"version\":0}],\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null,"
-                + "\"taskInstancePriority\":null,\"workerGroup\":null,\"timeout\":{\"enable\":false,\"strategy\":null,"
-                + "\"interval\":0},\"delayTime\":0}],\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
+        String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\","
+                + "\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,"
+                + "\"retryInterval\":0,\"params\":{},\"preTasks\":[\"unit-test\"],"
+                + "\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\",\"version\":0}],"
+                + "\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null,"
+                + "\"switchResult\":null,\"taskInstancePriority\":null,\"workerGroup\":null,"
+                + "\"timeout\":{\"enable\":false,\"strategy\":null,\"interval\":0},\"delayTime\":0}],"
+                + "\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
 
         ProcessDefinition processDefinition = new ProcessDefinition();
         processDefinition.setCode(1L);
diff --git a/pom.xml b/pom.xml
index 705a54b..522d9b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -992,6 +992,7 @@
                         <include>**/server/master/MasterCommandTest.java</include>
                         <include>**/server/master/DependentTaskTest.java</include>
                         <include>**/server/master/ConditionsTaskTest.java</include>
+                        <include>**/server/master/SwitchTaskTest.java</include>
                         <include>**/server/master/MasterExecThreadTest.java</include>
                         <include>**/server/master/ParamsTest.java</include>
                         <include>**/server/master/SubProcessTaskTest.java</include>