You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/11 14:51:15 UTC

[dolphinscheduler] branch 2.0.3-prepare updated: [cherry-pick-2.0.3][Improvement][TaskLog] Unified task log #7831 (#7926)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
     new 46a5d17  [cherry-pick-2.0.3][Improvement][TaskLog] Unified task log #7831 (#7926)
46a5d17 is described below

commit 46a5d17bf673427c8ecc1bfce9864dbc7625b269
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Tue Jan 11 22:51:00 2022 +0800

    [cherry-pick-2.0.3][Improvement][TaskLog] Unified task log #7831 (#7926)
    
    * [cherry-pick][Improvement][TaskLog] Unified task log #7831
    
    * [cherry-pick][Improvement][TaskLog] Unified task log #7831
    
    * fix thread name
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../master/runner/WorkflowExecuteThread.java       |  7 +-
 .../master/runner/task/BaseTaskProcessor.java      | 82 ++++++++++++++++++----
 .../master/runner/task/CommonTaskProcessor.java    | 20 ++----
 .../master/runner/task/ConditionTaskProcessor.java | 29 ++------
 .../master/runner/task/DependentTaskProcessor.java | 17 ++---
 .../server/master/runner/task/ITaskProcessor.java  |  4 +-
 .../master/runner/task/SubTaskProcessor.java       | 20 +++---
 .../master/runner/task/SwitchTaskProcessor.java    | 19 ++---
 .../server/master/runner/task/TaskAction.java      |  4 +-
 9 files changed, 108 insertions(+), 94 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 5988839..7907686 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -359,7 +359,7 @@ public class WorkflowExecuteThread implements Runnable {
             taskFinished(task);
         } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
             ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
-            iTaskProcessor.run();
+            iTaskProcessor.action(TaskAction.RUN);
 
             if (iTaskProcessor.taskState().typeIsFinished()) {
                 task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
@@ -630,11 +630,12 @@ public class WorkflowExecuteThread implements Runnable {
                     && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
                 notifyProcessHostUpdate(taskInstance);
             }
-            boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getMasterTaskCommitRetryTimes(), masterConfig.getMasterTaskCommitInterval());
+            taskProcessor.init(taskInstance, processInstance);
+            boolean submit = taskProcessor.action(TaskAction.SUBMIT);
             if (submit) {
                 this.taskInstanceHashMap.put(taskInstance.getId(), taskInstance.getTaskCode(), taskInstance);
                 activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
-                taskProcessor.run();
+                taskProcessor.action(TaskAction.RUN);
                 addTimeoutCheck(taskInstance);
                 addRetryCheck(taskInstance);
                 TaskDefinition taskDefinition = processService.findTaskDefinition(
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 4446485..508feb5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete
 import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -39,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
@@ -68,7 +70,7 @@ import com.google.common.base.Strings;
 
 public abstract class BaseTaskProcessor implements ITaskProcessor {
 
-    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME, getClass()));
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
 
     protected boolean killed = false;
 
@@ -78,10 +80,32 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     protected TaskInstance taskInstance = null;
 
-    protected ProcessInstance processInstance;
+    protected ProcessInstance processInstance = null;
+
+    protected int maxRetryTimes;
+
+    protected int commitInterval;
 
     protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
 
+    protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+
+    protected String threadLoggerInfoName;
+
+    @Override
+    public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
+        if (processService == null) {
+            processService = SpringApplicationContext.getBean(ProcessService.class);
+        }
+        if (masterConfig == null) {
+            masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+        }
+        this.taskInstance = taskInstance;
+        this.processInstance = processInstance;
+        this.maxRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
+        this.commitInterval = masterConfig.getMasterTaskCommitInterval();
+    }
+
     /**
      * persist task
      *
@@ -91,21 +115,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     /**
      * pause task, common tasks donot need this.
-     *
-     * @return
      */
     protected abstract boolean pauseTask();
 
     /**
      * kill task, all tasks need to realize this function
-     *
-     * @return
      */
     protected abstract boolean killTask();
 
     /**
      * task timeout process
-     * @return
      */
     protected abstract boolean taskTimeout();
 
@@ -119,12 +138,22 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         return persistTask(taskAction);
     }
 
-    @Override
-    public void run() {
-    }
+    /*
+     * submit task
+     */
+    protected abstract boolean submitTask();
+
+    /**
+     * run task
+     */
+    protected abstract boolean runTask();
 
     @Override
     public boolean action(TaskAction taskAction) {
+        String threadName = Thread.currentThread().getName();
+        if (StringUtils.isNotEmpty(threadLoggerInfoName)) {
+            Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT, threadLoggerInfoName));
+        }
 
         switch (taskAction) {
             case STOP:
@@ -133,13 +162,27 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
                 return pause();
             case TIMEOUT:
                 return timeout();
+            case SUBMIT:
+                return submit();
+            case RUN:
+                return run();
             default:
                 logger.error("unknown task action: {}", taskAction.toString());
-
         }
+
+        // reset thread name
+        Thread.currentThread().setName(threadName);
         return false;
     }
 
+    protected boolean submit() {
+        return submitTask();
+    }
+
+    protected boolean run() {
+        return runTask();
+    }
+
     protected boolean timeout() {
         if (timeout) {
             return true;
@@ -148,9 +191,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         return timeout;
     }
 
-    /**
-     * @return
-     */
     protected boolean pause() {
         if (paused) {
             return true;
@@ -173,6 +213,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
     }
 
     /**
+     * set master task running logger.
+     */
+    public void setTaskExecutionLogger() {
+        threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+                processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion(),
+                taskInstance.getProcessInstanceId(),
+                taskInstance.getId());
+        Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT, threadLoggerInfoName));
+    }
+
+    /**
      * get TaskExecutionContext
      *
      * @param taskInstance taskInstance
@@ -321,7 +373,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
         // whether udf type
         boolean udfTypeFlag = Enums.getIfPresent(UdfType.class, Strings.nullToEmpty(sqlParameters.getType())).isPresent()
-            && !StringUtils.isEmpty(sqlParameters.getUdfs());
+                && !StringUtils.isEmpty(sqlParameters.getUdfs());
 
         if (udfTypeFlag) {
             String[] udfFunIds = sqlParameters.getUdfs().split(",");
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 14bb3af..ca78ea3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -39,8 +38,6 @@ import org.apache.commons.lang.StringUtils;
 
 import java.util.Date;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /**
@@ -52,24 +49,16 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
     private TaskPriorityQueue taskUpdateQueue;
 
     @Autowired
-    MasterConfig masterConfig;
-
-    @Autowired
     NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
 
-    /**
-     * logger of MasterBaseTaskExecThread
-     */
-    protected Logger logger = LoggerFactory.getLogger(getClass());
-
     @Override
-    public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
-        this.processInstance = processInstance;
-        this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval);
+    public boolean submitTask() {
+        this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
 
         if (this.taskInstance == null) {
             return false;
         }
+        setTaskExecutionLogger();
         return dispatchTask(taskInstance, processInstance);
     }
 
@@ -79,7 +68,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
-    public void run() {
+    public boolean runTask() {
+        return true;
     }
 
     @Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 584e484..c303a80 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -26,15 +26,10 @@ import org.apache.dolphinscheduler.common.model.DependentItem;
 import org.apache.dolphinscheduler.common.model.DependentTaskModel;
 import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
 import org.apache.dolphinscheduler.common.utils.DependentUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.spi.task.TaskConstants;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -42,23 +37,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * condition task processor
  */
 public class ConditionTaskProcessor extends BaseTaskProcessor {
 
-    protected static final Logger logger = LoggerFactory.getLogger(TaskConstants.TASK_LOG_LOGGER_NAME);
-
     /**
      * dependent parameters
      */
     private DependentParameters dependentParameters;
 
-    ProcessInstance processInstance;
-
     /**
      * condition result
      */
@@ -69,14 +57,11 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
      */
     private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
 
-    MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-
     private TaskDefinition taskDefinition;
 
     @Override
-    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
-        this.processInstance = processInstance;
-        this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+    public boolean submitTask() {
+        this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
 
         if (this.taskInstance == null) {
             return false;
@@ -85,12 +70,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
                 taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
         );
 
-        String threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
-                processInstance.getProcessDefinitionCode(),
-                processInstance.getProcessDefinitionVersion(),
-                taskInstance.getProcessInstanceId(),
-                taskInstance.getId());
-        Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT,threadLoggerInfoName));
+        setTaskExecutionLogger();
         initTaskParameters();
         logger.info("dependent task start");
         return true;
@@ -102,13 +82,14 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
-    public void run() {
+    public boolean runTask() {
         if (conditionResult.equals(DependResult.WAITING)) {
             setConditionResult();
             endTask();
         } else {
             endTask();
         }
+        return true;
     }
 
     @Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 0f84a5f..100c75f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -27,13 +27,9 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
 import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
 import org.apache.dolphinscheduler.common.utils.DependentUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.DependentExecute;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -69,18 +65,13 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
 
     DependResult result;
 
-    ProcessInstance processInstance;
     TaskDefinition taskDefinition;
 
-    MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-
     boolean allDependentItemFinished;
 
     @Override
-    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
-        this.processInstance = processInstance;
-        this.taskInstance = task;
-        this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+    public boolean submitTask() {
+        this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
 
         if (this.taskInstance == null) {
             return false;
@@ -88,6 +79,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
         taskDefinition = processService.findTaskDefinition(
                 taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
         );
+        setTaskExecutionLogger();
         taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion(),
                 taskInstance.getProcessInstanceId(),
@@ -106,7 +98,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
-    public void run() {
+    public boolean runTask() {
         if (!allDependentItemFinished) {
             allDependentItemFinished = allDependentTaskFinish();
         }
@@ -114,6 +106,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
             getTaskDependResult();
             endTask();
         }
+        return true;
     }
 
     @Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index aa1e490..41fb0d0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -28,14 +28,12 @@ public interface ITaskProcessor {
 
     boolean persist(TaskAction taskAction);
 
-    void run();
+    void init(TaskInstance taskInstance, ProcessInstance processInstance);
 
     boolean action(TaskAction taskAction);
 
     String getType();
 
-    boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval);
-
     ExecutionStatus taskState();
 
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 02f08a8..16b21f0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -22,9 +22,9 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.Date;
@@ -36,8 +36,6 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public class SubTaskProcessor extends BaseTaskProcessor {
 
-    private ProcessInstance processInstance;
-
     private ProcessInstance subProcessInstance = null;
     private TaskDefinition taskDefinition;
 
@@ -49,17 +47,22 @@ public class SubTaskProcessor extends BaseTaskProcessor {
     private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
 
     @Override
-    public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
-        this.processInstance = processInstance;
+    public boolean submitTask() {
         taskDefinition = processService.findTaskDefinition(
-                task.getTaskCode(), task.getTaskDefinitionVersion()
+                taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
         );
-        this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+        this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
 
         if (this.taskInstance == null) {
             return false;
         }
 
+        setTaskExecutionLogger();
+        taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
+                processInstance.getProcessDefinitionVersion(),
+                taskInstance.getProcessInstanceId(),
+                taskInstance.getId()));
+
         return true;
     }
 
@@ -69,7 +72,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
-    public void run() {
+    public boolean runTask() {
         try {
             this.runLock.lock();
             if (setSubWorkFlow()) {
@@ -83,6 +86,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         } finally {
             this.runLock.unlock();
         }
+        return true;
     }
 
     @Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index c48a711..7134ff0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -25,13 +25,10 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
 import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
@@ -48,27 +45,22 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
 
     protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
 
-    private TaskInstance taskInstance;
-
-    private ProcessInstance processInstance;
     TaskDefinition taskDefinition;
 
-    MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-
     /**
      * switch result
      */
     private DependResult conditionResult;
 
     @Override
-    public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
-
-        this.processInstance = processInstance;
-        this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
+    public boolean submitTask() {
+        this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
 
         if (this.taskInstance == null) {
             return false;
         }
+        setTaskExecutionLogger();
+
         taskDefinition = processService.findTaskDefinition(
                 taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
         );
@@ -84,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
-    public void run() {
+    public boolean runTask() {
         try {
             if (!this.taskState().typeIsFinished() && setSwitchResult()) {
                 endTaskState();
@@ -95,6 +87,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
                     this.taskInstance.getId(),
                     e);
         }
+        return true;
     }
 
     @Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
index 42c8846..0c4db76 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
@@ -23,5 +23,7 @@ package org.apache.dolphinscheduler.server.master.runner.task;
 public enum  TaskAction {
     PAUSE,
     STOP,
-    TIMEOUT
+    TIMEOUT,
+    SUBMIT,
+    RUN
 }