You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/03/30 09:35:11 UTC

[dolphinscheduler] branch dev updated: [Improvement-7697][Master/Worker] Change the task ack to runnning callback (#8719)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ddf1ff9  [Improvement-7697][Master/Worker] Change the task ack to runnning callback (#8719)
ddf1ff9 is described below

commit ddf1ff98fa94e67ca29d24189d442fee8fbe3cda
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Mar 30 17:35:05 2022 +0800

    [Improvement-7697][Master/Worker] Change the task ack to runnning callback (#8719)
    
    * rebase dev
    
    * change task state to dispatch if dispatch success
    
    * update task host when dispatch
    
    * add dispatch task event
    
    * test
    
    * check tenant after enable auto create
    
    * handle dispatch state
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../apache/dolphinscheduler/common/Constants.java  |  49 +++---
 .../dolphinscheduler/common/enums/Event.java       |   7 +-
 .../common/enums/TaskStateType.java                |   1 +
 .../server/master/MasterServer.java                |  12 +-
 .../master/consumer/TaskPriorityQueueConsumer.java |  26 +++
 .../dispatch/executor/NettyExecutorManager.java    |  23 +--
 ...ssor.java => TaskExecuteResponseProcessor.java} |  30 ++--
 ...essor.java => TaskExecuteRunningProcessor.java} |  39 ++---
 .../{TaskResponseEvent.java => TaskEvent.java}     |  74 +++++----
 ...kResponseService.java => TaskEventService.java} | 150 ++++++++++-------
 .../master/runner/task/BaseTaskProcessor.java      |   7 +-
 .../server/utils/DataQualityResultOperator.java    |   6 +-
 .../server/utils/DependentExecute.java             |   1 +
 .../master/processor/TaskAckProcessorTest.java     |  91 +++++++++++
 .../processor/queue/TaskResponseServiceTest.java   |  68 +++++---
 .../remote/command/CommandType.java                |  16 +-
 .../remote/command/StateEventResponseCommand.java  |   2 +-
 ...and.java => TaskExecuteResponseAckCommand.java} |  13 +-
 .../remote/command/TaskExecuteResponseCommand.java |  71 +++++++-
 ...mand.java => TaskExecuteRunningAckCommand.java} |  13 +-
 ...Command.java => TaskExecuteRunningCommand.java} |  55 +++++--
 .../service/process/ProcessService.java            | 182 ++++++++-------------
 .../plugin/task/api/TaskExecutionContext.java      |  63 ++++++-
 .../plugin/task/api/enums/ExecutionStatus.java     |   4 +-
 .../server/worker/WorkerServer.java                |  12 +-
 .../server/worker/cache/ResponseCache.java         |  34 ++--
 .../worker/processor/TaskCallbackService.java      | 131 +++++++++++++--
 .../worker/processor/TaskExecuteProcessor.java     |  70 +++-----
 ...r.java => TaskExecuteResponseAckProcessor.java} |  28 ++--
 ...or.java => TaskExecuteRunningAckProcessor.java} |  24 +--
 .../server/worker/processor/TaskKillProcessor.java |  16 +-
 .../worker/runner/RetryReportTaskStatusThread.java |  16 +-
 .../server/worker/runner/TaskExecuteThread.java    | 106 +++---------
 .../server/worker/runner/WorkerManagerThread.java  |   2 +-
 .../worker/processor/TaskExecuteProcessorTest.java |  16 +-
 35 files changed, 873 insertions(+), 585 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index e149c2e..0572e8f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -17,9 +17,10 @@
 
 package org.apache.dolphinscheduler.common;
 
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 import java.util.regex.Pattern;
 
@@ -48,20 +49,20 @@ public final class Constants {
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
-    public static final String FORMAT_SS ="%s%s";
-    public static final String FORMAT_S_S ="%s/%s";
-    public static final String AWS_ACCESS_KEY_ID="aws.access.key.id";
-    public static final String AWS_SECRET_ACCESS_KEY="aws.secret.access.key";
-    public static final String AWS_REGION="aws.region";
-    public static final String FOLDER_SEPARATOR ="/";
+    public static final String FORMAT_SS = "%s%s";
+    public static final String FORMAT_S_S = "%s/%s";
+    public static final String AWS_ACCESS_KEY_ID = "aws.access.key.id";
+    public static final String AWS_SECRET_ACCESS_KEY = "aws.secret.access.key";
+    public static final String AWS_REGION = "aws.region";
+    public static final String FOLDER_SEPARATOR = "/";
 
     public static final String RESOURCE_TYPE_FILE = "resources";
 
-    public static final String RESOURCE_TYPE_UDF="udfs";
+    public static final String RESOURCE_TYPE_UDF = "udfs";
 
-    public static final String STORAGE_S3="S3";
+    public static final String STORAGE_S3 = "S3";
 
-    public static final String STORAGE_HDFS="HDFS";
+    public static final String STORAGE_HDFS = "HDFS";
 
     public static final String BUCKET_NAME = "dolphinscheduler-test";
 
@@ -71,7 +72,6 @@ public final class Constants {
     public static final String FS_DEFAULT_FS = "fs.defaultFS";
 
 
-
     /**
      * hadoop configuration
      */
@@ -254,7 +254,7 @@ public final class Constants {
      * user name regex
      */
     public static final Pattern REGEX_USER_NAME = Pattern.compile("^[a-zA-Z0-9._-]{3,39}$");
-    
+
     /**
      * read permission
      */
@@ -424,7 +424,7 @@ public final class Constants {
     /**
      * process or task definition first version
      */
-    public static final int VERSION_FIRST  = 1;
+    public static final int VERSION_FIRST = 1;
 
     /**
      * date format of yyyyMMdd
@@ -584,7 +584,6 @@ public final class Constants {
     public static final long DEPENDENT_ALL_TASK_CODE = 0;
 
 
-
     /**
      * preview schedule execute count
      */
@@ -640,20 +639,22 @@ public final class Constants {
      */
     public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
 
-    public static final int[] NOT_TERMINATED_STATES = new int[] {
-        ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-        ExecutionStatus.RUNNING_EXECUTION.ordinal(),
-        ExecutionStatus.DELAY_EXECUTION.ordinal(),
-        ExecutionStatus.READY_PAUSE.ordinal(),
-        ExecutionStatus.READY_STOP.ordinal(),
-        ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
-        ExecutionStatus.WAITING_THREAD.ordinal(),
-        ExecutionStatus.WAITING_DEPEND.ordinal()
+    public static final int[] NOT_TERMINATED_STATES = new int[]{
+            ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+            ExecutionStatus.DISPATCH.ordinal(),
+            ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+            ExecutionStatus.DELAY_EXECUTION.ordinal(),
+            ExecutionStatus.READY_PAUSE.ordinal(),
+            ExecutionStatus.READY_STOP.ordinal(),
+            ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
+            ExecutionStatus.WAITING_THREAD.ordinal(),
+            ExecutionStatus.WAITING_DEPEND.ordinal()
     };
 
-    public static final int[] RUNNING_PROCESS_STATE = new int[] {
+    public static final int[] RUNNING_PROCESS_STATE = new int[]{
             ExecutionStatus.RUNNING_EXECUTION.ordinal(),
             ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+            ExecutionStatus.DISPATCH.ordinal(),
             ExecutionStatus.SERIAL_WAIT.ordinal()
     };
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
index 9cec276..78b036f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
@@ -18,6 +18,9 @@
 package org.apache.dolphinscheduler.common.enums;
 
 public enum Event {
-    ACK,
-    RESULT;
+    DISPATCH,
+    DELAY,
+    RUNNING,
+    RESULT,
+    ;
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
index f544b41..482cc65 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
@@ -54,6 +54,7 @@ public enum TaskStateType {
                 };
             case RUNNING:
                 return new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+                        ExecutionStatus.DISPATCH.ordinal(),
                         ExecutionStatus.RUNNING_EXECUTION.ordinal(),
                         ExecutionStatus.DELAY_EXECUTION.ordinal(),
                         ExecutionStatus.READY_PAUSE.ordinal(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 22b4a69..af050ae 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,10 +27,10 @@ import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
 import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
 import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
@@ -75,10 +75,10 @@ public class MasterServer implements IStoppable {
     private Scheduler scheduler;
 
     @Autowired
-    private TaskAckProcessor taskAckProcessor;
+    private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
 
     @Autowired
-    private TaskResponseProcessor taskResponseProcessor;
+    private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
 
     @Autowired
     private TaskEventProcessor taskEventProcessor;
@@ -115,8 +115,8 @@ public class MasterServer implements IStoppable {
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(masterConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index d7d5494..06bbc30 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -24,11 +24,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
 import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
@@ -79,6 +82,11 @@ public class TaskPriorityQueueConsumer extends Thread {
     @Autowired
     private ExecutorDispatcher dispatcher;
 
+    /**
+     * processInstance cache manager
+     */
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
     /**
      * master config
@@ -87,6 +95,12 @@ public class TaskPriorityQueueConsumer extends Thread {
     private MasterConfig masterConfig;
 
     /**
+     * task response service
+     */
+    @Autowired
+    private TaskEventService taskEventService;
+
+    /**
      * consumer thread pool
      */
     private ThreadPoolExecutor consumerThreadPoolExecutor;
@@ -168,12 +182,24 @@ public class TaskPriorityQueueConsumer extends Thread {
             }
 
             result = dispatcher.dispatch(executionContext);
+
+            if (result) {
+                addDispatchEvent(context, executionContext);
+            }
         } catch (RuntimeException | ExecuteException e) {
             logger.error("dispatch error: {}", e.getMessage(), e);
         }
         return result;
     }
 
+    /**
+     * add dispatch event
+     */
+    private void addDispatchEvent(TaskExecutionContext context, ExecutionContext executionContext) {
+        TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(), executionContext.getHost().getAddress());
+        taskEventService.addEvent(taskEvent);
+    }
+
     private Command toCommand(TaskExecutionContext taskExecutionContext) {
         TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
         requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 0e5333b..0ba24e2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -26,9 +26,9 @@ import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -46,7 +46,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- *  netty executor manager
+ * netty executor manager
  */
 @Service
 public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
@@ -60,13 +60,13 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
     private ServerNodeManager serverNodeManager;
 
     @Autowired
-    private TaskAckProcessor taskAckProcessor;
+    private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
 
     @Autowired
     private TaskKillResponseProcessor taskKillResponseProcessor;
 
     @Autowired
-    private TaskResponseProcessor taskResponseProcessor;
+    private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
 
     /**
      * netty remote client
@@ -83,13 +83,14 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
 
     @PostConstruct
     public void init() {
-        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
-        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
         this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
     }
 
     /**
      * execute logic
+     *
      * @param context context
      * @return result
      * @throws ExecuteException if error throws ExecuteException
@@ -119,7 +120,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
         boolean success = false;
         while (!success) {
             try {
-                doExecute(host,command);
+                doExecute(host, command);
                 success = true;
                 context.setHost(host);
             } catch (ExecuteException ex) {
@@ -150,7 +151,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
     }
 
     /**
-     *  execute logic
+     * execute logic
+     *
      * @param host host
      * @param command command
      * @throws ExecuteException if error throws ExecuteException
@@ -178,7 +180,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
     }
 
     /**
-     *  get all nodes
+     * get all nodes
+     *
      * @param context context
      * @return nodes
      */
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
similarity index 64%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
index 47bdfca..264e42e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
@@ -18,13 +18,12 @@
 package org.apache.dolphinscheduler.server.master.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,15 +35,15 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 
 /**
- * task response processor
+ * task execute response processor
  */
 @Component
-public class TaskResponseProcessor implements NettyRequestProcessor {
+public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class);
+    private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseProcessor.class);
 
     @Autowired
-    private TaskResponseService taskResponseService;
+    private TaskEventService taskEventService;
 
     /**
      * task final result response
@@ -57,19 +56,10 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
 
-        TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
-        logger.info("received command : {}", responseCommand);
+        TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
+        logger.info("received command : {}", taskExecuteResponseCommand);
 
-        // TaskResponseEvent
-        TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
-                responseCommand.getEndTime(),
-                responseCommand.getProcessId(),
-                responseCommand.getAppIds(),
-                responseCommand.getTaskInstanceId(),
-                responseCommand.getVarPool(),
-                channel,
-                responseCommand.getProcessInstanceId()
-        );
-        taskResponseService.addResponse(taskResponseEvent);
+        TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
+        taskEventService.addEvent(taskResponseEvent);
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
similarity index 54%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
index 1d188a1..c5ddc3d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
@@ -18,14 +18,12 @@
 package org.apache.dolphinscheduler.server.master.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,15 +35,15 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 
 /**
- * task ack processor
+ * task execute running processor
  */
 @Component
-public class TaskAckProcessor implements NettyRequestProcessor {
+public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = LoggerFactory.getLogger(TaskAckProcessor.class);
+    private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningProcessor.class);
 
     @Autowired
-    private TaskResponseService taskResponseService;
+    private TaskEventService taskEventService;
 
     /**
      * task ack process
@@ -55,25 +53,12 @@ public class TaskAckProcessor implements NettyRequestProcessor {
      */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
-        TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class);
-        logger.info("taskAckCommand : {}", taskAckCommand);
+        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
+        TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
+        logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
 
-        String workerAddress = ChannelUtils.toAddress(channel).getAddress();
-
-        ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());
-
-        // TaskResponseEvent
-        TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,
-                taskAckCommand.getStartTime(),
-                workerAddress,
-                taskAckCommand.getExecutePath(),
-                taskAckCommand.getLogPath(),
-                taskAckCommand.getTaskInstanceId(),
-                channel,
-                taskAckCommand.getProcessInstanceId());
-
-        taskResponseService.addResponse(taskResponseEvent);
+        TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
+        taskEventService.addEvent(taskEvent);
     }
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
similarity index 69%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 2a6bd07..865eee5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 
 import java.util.Date;
 
@@ -27,7 +30,7 @@ import io.netty.channel.Channel;
 /**
  * task event
  */
-public class TaskResponseEvent {
+public class TaskEvent {
 
     /**
      * taskInstanceId
@@ -90,46 +93,45 @@ public class TaskResponseEvent {
     private Channel channel;
 
     private int processInstanceId;
-    
-    public static TaskResponseEvent newAck(ExecutionStatus state,
-                                           Date startTime,
-                                           String workerAddress,
-                                           String executePath,
-                                           String logPath,
-                                           int taskInstanceId,
-                                           Channel channel,
-                                           int processInstanceId) {
-        TaskResponseEvent event = new TaskResponseEvent();
-        event.setState(state);
-        event.setStartTime(startTime);
-        event.setWorkerAddress(workerAddress);
-        event.setExecutePath(executePath);
-        event.setLogPath(logPath);
+
+    public static TaskEvent newDispatchEvent(int processInstanceId, int taskInstanceId, String workerAddress) {
+        TaskEvent event = new TaskEvent();
+        event.setProcessInstanceId(processInstanceId);
         event.setTaskInstanceId(taskInstanceId);
-        event.setEvent(Event.ACK);
+        event.setWorkerAddress(workerAddress);
+        event.setEvent(Event.DISPATCH);
+        return event;
+    }
+
+    public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) {
+        TaskEvent event = new TaskEvent();
+        event.setProcessInstanceId(command.getProcessInstanceId());
+        event.setTaskInstanceId(command.getTaskInstanceId());
+        event.setState(ExecutionStatus.of(command.getStatus()));
+        event.setStartTime(command.getStartTime());
+        event.setExecutePath(command.getExecutePath());
+        event.setLogPath(command.getLogPath());
         event.setChannel(channel);
-        event.setProcessInstanceId(processInstanceId);
+        event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
+        event.setEvent(Event.RUNNING);
         return event;
     }
 
-    public static TaskResponseEvent newResult(ExecutionStatus state,
-                                              Date endTime,
-                                              int processId,
-                                              String appIds,
-                                              int taskInstanceId,
-                                              String varPool,
-                                              Channel channel,
-                                              int processInstanceId) {
-        TaskResponseEvent event = new TaskResponseEvent();
-        event.setState(state);
-        event.setEndTime(endTime);
-        event.setProcessId(processId);
-        event.setAppIds(appIds);
-        event.setTaskInstanceId(taskInstanceId);
-        event.setEvent(Event.RESULT);
-        event.setVarPool(varPool);
+    public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Channel channel) {
+        TaskEvent event = new TaskEvent();
+        event.setProcessInstanceId(command.getProcessInstanceId());
+        event.setTaskInstanceId(command.getTaskInstanceId());
+        event.setState(ExecutionStatus.of(command.getStatus()));
+        event.setStartTime(command.getStartTime());
+        event.setExecutePath(command.getExecutePath());
+        event.setLogPath(command.getLogPath());
+        event.setEndTime(command.getEndTime());
+        event.setProcessId(command.getProcessId());
+        event.setAppIds(command.getAppIds());
+        event.setVarPool(command.getVarPool());
         event.setChannel(channel);
-        event.setProcessInstanceId(processInstanceId);
+        event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
+        event.setEvent(Event.RESULT);
         return event;
     }
 
@@ -140,7 +142,7 @@ public class TaskResponseEvent {
     public void setVarPool(String varPool) {
         this.varPool = varPool;
     }
-    
+
     public int getTaskInstanceId() {
         return taskInstanceId;
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
similarity index 50%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index a0de0af..4984390 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -50,17 +50,17 @@ import io.netty.channel.Channel;
  * task manager
  */
 @Component
-public class TaskResponseService {
+public class TaskEventService {
 
     /**
      * logger
      */
-    private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
+    private final Logger logger = LoggerFactory.getLogger(TaskEventService.class);
 
     /**
      * attemptQueue
      */
-    private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<>();
 
     /**
      * process service
@@ -75,9 +75,9 @@ public class TaskResponseService {
     private DataQualityResultOperator dataQualityResultOperator;
 
     /**
-     * task response worker
+     * task event worker
      */
-    private Thread taskResponseWorker;
+    private Thread taskEventWorker;
 
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@@ -87,19 +87,19 @@ public class TaskResponseService {
 
     @PostConstruct
     public void start() {
-        this.taskResponseWorker = new TaskResponseWorker();
-        this.taskResponseWorker.setName("StateEventResponseWorker");
-        this.taskResponseWorker.start();
+        this.taskEventWorker = new TaskEventWorker();
+        this.taskEventWorker.setName("TaskStateEventWorker");
+        this.taskEventWorker.start();
     }
 
     @PreDestroy
     public void stop() {
         try {
-            this.taskResponseWorker.interrupt();
+            this.taskEventWorker.interrupt();
             if (!eventQueue.isEmpty()) {
-                List<TaskResponseEvent> remainEvents = new ArrayList<>(eventQueue.size());
+                List<TaskEvent> remainEvents = new ArrayList<>(eventQueue.size());
                 eventQueue.drainTo(remainEvents);
-                for (TaskResponseEvent event : remainEvents) {
+                for (TaskEvent event : remainEvents) {
                     this.persist(event);
                 }
             }
@@ -109,15 +109,15 @@ public class TaskResponseService {
     }
 
     /**
-     * put task to attemptQueue
+     * add event to queue
      *
-     * @param taskResponseEvent taskResponseEvent
+     * @param taskEvent taskEvent
      */
-    public void addResponse(TaskResponseEvent taskResponseEvent) {
+    public void addEvent(TaskEvent taskEvent) {
         try {
-            eventQueue.put(taskResponseEvent);
+            eventQueue.put(taskEvent);
         } catch (InterruptedException e) {
-            logger.error("put task : {} error :{}", taskResponseEvent, e);
+            logger.error("add task event : {} error :{}", taskEvent, e);
             Thread.currentThread().interrupt();
         }
     }
@@ -125,7 +125,7 @@ public class TaskResponseService {
     /**
      * task worker thread
      */
-    class TaskResponseWorker extends Thread {
+    class TaskEventWorker extends Thread {
 
         @Override
         public void run() {
@@ -133,8 +133,8 @@ public class TaskResponseService {
             while (Stopper.isRunning()) {
                 try {
                     // if not task , blocking here
-                    TaskResponseEvent taskResponseEvent = eventQueue.take();
-                    persist(taskResponseEvent);
+                    TaskEvent taskEvent = eventQueue.take();
+                    persist(taskEvent);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     break;
@@ -147,14 +147,14 @@ public class TaskResponseService {
     }
 
     /**
-     * persist  taskResponseEvent
+     * persist task event
      *
-     * @param taskResponseEvent taskResponseEvent
+     * @param taskEvent taskEvent
      */
-    private void persist(TaskResponseEvent taskResponseEvent) {
-        Event event = taskResponseEvent.getEvent();
-        int taskInstanceId = taskResponseEvent.getTaskInstanceId();
-        int processInstanceId = taskResponseEvent.getProcessInstanceId();
+    private void persist(TaskEvent taskEvent) {
+        Event event = taskEvent.getEvent();
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
 
         TaskInstance taskInstance;
         WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
@@ -165,75 +165,103 @@ public class TaskResponseService {
         }
 
         switch (event) {
-            case ACK:
-                handleAckEvent(taskResponseEvent, taskInstance);
+            case DISPATCH:
+                handleDispatchEvent(taskEvent, taskInstance);
+                // dispatch event do not need to submit state event
+                return;
+            case DELAY:
+            case RUNNING:
+                handleRunningEvent(taskEvent, taskInstance);
                 break;
             case RESULT:
-                handleResultEvent(taskResponseEvent, taskInstance);
+                handleResultEvent(taskEvent, taskInstance);
                 break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + event);
         }
 
         StateEvent stateEvent = new StateEvent();
-        stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
-        stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
-        stateEvent.setExecutionStatus(taskResponseEvent.getState());
+        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+        stateEvent.setExecutionStatus(taskEvent.getState());
         stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
         workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
     /**
-     * handle ack event
+     * handle dispatch event
      */
-    private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
-        Channel channel = taskResponseEvent.getChannel();
+    private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+        if (taskInstance == null) {
+            logger.error("taskInstance is null");
+            return;
+        }
+        if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
+            return;
+        }
+        taskInstance.setState(ExecutionStatus.DISPATCH);
+        taskInstance.setHost(taskEvent.getWorkerAddress());
+        processService.saveTaskInstance(taskInstance);
+    }
+
+    /**
+     * handle running event
+     */
+    private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+        Channel channel = taskEvent.getChannel();
         try {
             if (taskInstance != null) {
                 if (taskInstance.getState().typeIsFinished()) {
-                    logger.warn("task is finish, ack is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
+                    logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
                 } else {
-                    processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
-                            taskResponseEvent.getStartTime(),
-                            taskResponseEvent.getWorkerAddress(),
-                            taskResponseEvent.getExecutePath(),
-                            taskResponseEvent.getLogPath()
-                    );
+                    taskInstance.setState(taskEvent.getState());
+                    taskInstance.setStartTime(taskEvent.getStartTime());
+                    taskInstance.setHost(taskEvent.getWorkerAddress());
+                    taskInstance.setLogPath(taskEvent.getLogPath());
+                    taskInstance.setExecutePath(taskEvent.getExecutePath());
+                    taskInstance.setPid(taskEvent.getProcessId());
+                    taskInstance.setAppLink(taskEvent.getAppIds());
+                    processService.saveTaskInstance(taskInstance);
                 }
             }
             // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
-            DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
-            channel.writeAndFlush(taskAckCommand.convert2Command());
+            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
         } catch (Exception e) {
             logger.error("worker ack master error", e);
-            DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-            channel.writeAndFlush(taskAckCommand.convert2Command());
+            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
         }
     }
 
     /**
      * handle result event
      */
-    private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
-        Channel channel = taskResponseEvent.getChannel();
+    private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+        Channel channel = taskEvent.getChannel();
         try {
             if (taskInstance != null) {
-                dataQualityResultOperator.operateDqExecuteResult(taskResponseEvent, taskInstance);
-
-                processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
-                        taskResponseEvent.getEndTime(),
-                        taskResponseEvent.getProcessId(),
-                        taskResponseEvent.getAppIds(),
-                        taskResponseEvent.getVarPool()
-                );
+                dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
+
+                taskInstance.setStartTime(taskEvent.getStartTime());
+                taskInstance.setHost(taskEvent.getWorkerAddress());
+                taskInstance.setLogPath(taskEvent.getLogPath());
+                taskInstance.setExecutePath(taskEvent.getExecutePath());
+                taskInstance.setPid(taskEvent.getProcessId());
+                taskInstance.setAppLink(taskEvent.getAppIds());
+                taskInstance.setState(taskEvent.getState());
+                taskInstance.setEndTime(taskEvent.getEndTime());
+                taskInstance.setVarPool(taskEvent.getVarPool());
+                processService.changeOutParam(taskInstance);
+                processService.saveTaskInstance(taskInstance);
             }
             // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
-            DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
-            channel.writeAndFlush(taskResponseCommand.convert2Command());
+            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         } catch (Exception e) {
             logger.error("worker response master error", e);
-            DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
-            channel.writeAndFlush(taskResponseCommand.convert2Command());
+            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         }
     }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index c53a02e..38fef8f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -259,11 +259,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
         // verify tenant is null
         if (verifyTenantIsNull(tenant, taskInstance)) {
-            processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    null,
-                    null);
+            taskInstance.setState(ExecutionStatus.FAILURE);
+            processService.saveTaskInstance(taskInstance);
             return null;
         }
         // set queue for process instance, user-specified queue takes precedence over tenant queue
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java
index 906c3a9..d172228 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java
@@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.CheckType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqFailureStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OperatorType;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -59,7 +59,7 @@ public class DataQualityResultOperator {
      * @param taskResponseEvent
      * @param taskInstance
      */
-    public void operateDqExecuteResult(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
+    public void operateDqExecuteResult(TaskEvent taskResponseEvent, TaskInstance taskInstance) {
         if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) {
 
             ProcessInstance processInstance =
@@ -92,7 +92,7 @@ public class DataQualityResultOperator {
      * @param dqExecuteResult
      * @param processInstance
      */
-    private void checkDqExecuteResult(TaskResponseEvent taskResponseEvent,
+    private void checkDqExecuteResult(TaskEvent taskResponseEvent,
                                       DqExecuteResult dqExecuteResult,
                                       ProcessInstance processInstance) {
         if (isFailure(dqExecuteResult)) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
index 7429dc2..1a744ea 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
@@ -231,6 +231,7 @@ public class DependentExecute {
 
         if (state.typeIsRunning()
                 || state == ExecutionStatus.SUBMITTED_SUCCESS
+                || state == ExecutionStatus.DISPATCH
                 || state == ExecutionStatus.WAITING_THREAD) {
             return DependResult.WAITING;
         } else {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
new file mode 100644
index 0000000..fc320ab
--- /dev/null
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import io.netty.channel.Channel;
+
+/**
+ * task ack processor test
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class, TaskEvent.class})
+public class TaskAckProcessorTest {
+
+    private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
+    private TaskEventService taskEventService;
+    private ProcessService processService;
+    private TaskExecuteRunningCommand taskExecuteRunningCommand;
+    private TaskEvent taskResponseEvent;
+    private Channel channel;
+
+    @Before
+    public void before() {
+        PowerMockito.mockStatic(SpringApplicationContext.class);
+
+        taskEventService = PowerMockito.mock(TaskEventService.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskEventService.class)).thenReturn(taskEventService);
+
+        processService = PowerMockito.mock(ProcessService.class);
+        PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
+
+        taskExecuteRunningProcessor = new TaskExecuteRunningProcessor();
+
+        channel = PowerMockito.mock(Channel.class);
+        taskResponseEvent = PowerMockito.mock(TaskEvent.class);
+
+        taskExecuteRunningCommand = new TaskExecuteRunningCommand();
+        taskExecuteRunningCommand.setStatus(1);
+        taskExecuteRunningCommand.setExecutePath("/dolphinscheduler/worker");
+        taskExecuteRunningCommand.setHost("localhost");
+        taskExecuteRunningCommand.setLogPath("/temp/worker.log");
+        taskExecuteRunningCommand.setStartTime(new Date());
+        taskExecuteRunningCommand.setTaskInstanceId(1);
+        taskExecuteRunningCommand.setProcessInstanceId(1);
+    }
+
+    @Test
+    public void testProcess() {
+//        Command command = taskExecuteAckCommand.convert2Command();
+//        Assert.assertEquals(CommandType.TASK_EXECUTE_ACK,command.getType());
+//        InetSocketAddress socketAddress = new InetSocketAddress("localhost",12345);
+//        PowerMockito.when(channel.remoteAddress()).thenReturn(socketAddress);
+//        PowerMockito.mockStatic(TaskResponseEvent.class);
+//
+//        PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt(), channel))
+//                .thenReturn(taskResponseEvent);
+//        TaskInstance taskInstance = PowerMockito.mock(TaskInstance.class);
+//        PowerMockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
+//
+//        taskAckProcessor.process(channel,command);
+    }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 9794bdd..9a054b6 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -19,9 +19,14 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
 import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.net.InetSocketAddress;
 import java.util.Date;
 
 import org.junit.After;
@@ -42,41 +47,52 @@ public class TaskResponseServiceTest {
     private ProcessService processService;
 
     @InjectMocks
-    TaskResponseService taskRspService;
+    TaskEventService taskEventService;
 
     @Mock
     private Channel channel;
 
-    private TaskResponseEvent ackEvent;
+    private TaskEvent ackEvent;
 
-    private TaskResponseEvent resultEvent;
+    private TaskEvent resultEvent;
 
     private TaskInstance taskInstance;
 
     @Mock
     private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
 
+    @Mock
+    private DataQualityResultOperator dataQualityResultOperator;
+
+    @Mock
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
     @Before
     public void before() {
-        taskRspService.start();
-
-        ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,
-                new Date(),
-                "127.*.*.*",
-                "path",
-                "logPath",
-                22,
-                channel,
-                1);
-
-        resultEvent = TaskResponseEvent.newResult(ExecutionStatus.SUCCESS,
-                new Date(),
-                1,
-                "ids",
-                22,
-                "varPol",
-                channel,
-                1);
+        taskEventService.start();
+
+        Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));
+
+        TaskExecuteRunningCommand taskExecuteRunningCommand = new TaskExecuteRunningCommand();
+        taskExecuteRunningCommand.setProcessId(1);
+        taskExecuteRunningCommand.setTaskInstanceId(22);
+        taskExecuteRunningCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
+        taskExecuteRunningCommand.setExecutePath("path");
+        taskExecuteRunningCommand.setLogPath("logPath");
+        taskExecuteRunningCommand.setHost("127.*.*.*");
+        taskExecuteRunningCommand.setStartTime(new Date());
+
+        ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
+
+        TaskExecuteResponseCommand taskExecuteResponseCommand = new TaskExecuteResponseCommand();
+        taskExecuteResponseCommand.setProcessInstanceId(1);
+        taskExecuteResponseCommand.setTaskInstanceId(22);
+        taskExecuteResponseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
+        taskExecuteResponseCommand.setEndTime(new Date());
+        taskExecuteResponseCommand.setVarPool("varPol");
+        taskExecuteResponseCommand.setAppIds("ids");
+        taskExecuteResponseCommand.setProcessId(1);
+        resultEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
 
         taskInstance = new TaskInstance();
         taskInstance.setId(22);
@@ -87,14 +103,14 @@ public class TaskResponseServiceTest {
     public void testAddResponse() {
         Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
         Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null);
-        taskRspService.addResponse(ackEvent);
-        taskRspService.addResponse(resultEvent);
+        taskEventService.addEvent(ackEvent);
+        taskEventService.addEvent(resultEvent);
     }
 
     @After
     public void after() {
-        if (taskRspService != null) {
-            taskRspService.stop();
+        if (taskEventService != null) {
+            taskEventService.stop();
         }
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index a502242..5718872 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -69,24 +69,24 @@ public enum CommandType {
     TASK_EXECUTE_REQUEST,
 
     /**
-     * execute task ack
+     * task execute running, from worker to master
      */
-    TASK_EXECUTE_ACK,
+    TASK_EXECUTE_RUNNING,
 
     /**
-     * execute task response
+     * task execute running ack, from master to worker
      */
-    TASK_EXECUTE_RESPONSE,
+    TASK_EXECUTE_RUNNING_ACK,
 
     /**
-     * db task ack
+     * task execute response, from worker to master
      */
-    DB_TASK_ACK,
+    TASK_EXECUTE_RESPONSE,
 
     /**
-     * db task response
+     * task execute response ack, from master to worker
      */
-    DB_TASK_RESPONSE,
+    TASK_EXECUTE_RESPONSE_ACK,
 
     /**
      * kill task
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
index fd9c428..41ee1ef 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
@@ -61,7 +61,7 @@ public class StateEventResponseCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.DB_TASK_RESPONSE);
+        command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
similarity index 83%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
index 9bd86cb..f3df257 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
@@ -22,18 +22,19 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import java.io.Serializable;
 
 /**
- * db task final result response command
+ * task execute response ack command
+ * from master to worker
  */
-public class DBTaskResponseCommand implements Serializable {
+public class TaskExecuteResponseAckCommand implements Serializable {
 
     private int taskInstanceId;
     private int status;
 
-    public DBTaskResponseCommand() {
+    public TaskExecuteResponseAckCommand() {
         super();
     }
 
-    public DBTaskResponseCommand(int status, int taskInstanceId) {
+    public TaskExecuteResponseAckCommand(int status, int taskInstanceId) {
         this.status = status;
         this.taskInstanceId = taskInstanceId;
     }
@@ -61,7 +62,7 @@ public class DBTaskResponseCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.DB_TASK_RESPONSE);
+        command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
@@ -69,7 +70,7 @@ public class DBTaskResponseCommand implements Serializable {
 
     @Override
     public String toString() {
-        return "DBTaskResponseCommand{"
+        return "TaskExecuteResponseAckCommand{"
             + "taskInstanceId=" + taskInstanceId
             + ", status=" + status
             + '}';
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index eafb803..4574b88 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import java.util.Date;
 
 /**
- *  execute task response command
+ * execute task response command
  */
 public class TaskExecuteResponseCommand implements Serializable {
 
@@ -36,23 +36,43 @@ public class TaskExecuteResponseCommand implements Serializable {
     }
 
     /**
-     *  task instance id
+     * task instance id
      */
     private int taskInstanceId;
 
     /**
      * process instance id
      */
-    private  int processInstanceId;
+    private int processInstanceId;
 
     /**
-     *  status
+     * status
      */
     private int status;
 
+    /**
+     * startTime
+     */
+    private Date startTime;
+
+    /**
+     * host
+     */
+    private String host;
+
+    /**
+     * logPath
+     */
+    private String logPath;
 
     /**
-     *  end time
+     * executePath
+     */
+    private String executePath;
+
+
+    /**
+     * end time
      */
     private Date endTime;
 
@@ -72,6 +92,38 @@ public class TaskExecuteResponseCommand implements Serializable {
      */
     private String varPool;
 
+    public Date getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Date startTime) {
+        this.startTime = startTime;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public String getLogPath() {
+        return logPath;
+    }
+
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
+    public String getExecutePath() {
+        return executePath;
+    }
+
+    public void setExecutePath(String executePath) {
+        this.executePath = executePath;
+    }
+
     public void setVarPool(String varPool) {
         this.varPool = varPool;
     }
@@ -79,7 +131,7 @@ public class TaskExecuteResponseCommand implements Serializable {
     public String getVarPool() {
         return varPool;
     }
-    
+
     public int getTaskInstanceId() {
         return taskInstanceId;
     }
@@ -122,6 +174,7 @@ public class TaskExecuteResponseCommand implements Serializable {
 
     /**
      * package response command
+     *
      * @return command
      */
     public Command convert2Command() {
@@ -136,10 +189,16 @@ public class TaskExecuteResponseCommand implements Serializable {
     public String toString() {
         return "TaskExecuteResponseCommand{"
                 + "taskInstanceId=" + taskInstanceId
+                + ", processInstanceId=" + processInstanceId
                 + ", status=" + status
+                + ", startTime=" + startTime
                 + ", endTime=" + endTime
+                + ", host=" + host
+                + ", logPath=" + logPath
+                + ", executePath=" + executePath
                 + ", processId=" + processId
                 + ", appIds='" + appIds + '\''
+                + ", varPool=" + varPool
                 + '}';
     }
 
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
similarity index 80%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
index 4797104..b0bb666 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
@@ -22,18 +22,19 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import java.io.Serializable;
 
 /**
- * db task ack request command
+ * task execute running ack command
+ * from master to worker
  */
-public class DBTaskAckCommand implements Serializable {
+public class TaskExecuteRunningAckCommand implements Serializable {
 
     private int taskInstanceId;
     private int status;
 
-    public DBTaskAckCommand() {
+    public TaskExecuteRunningAckCommand() {
         super();
     }
 
-    public DBTaskAckCommand(int status, int taskInstanceId) {
+    public TaskExecuteRunningAckCommand(int status, int taskInstanceId) {
         this.status = status;
         this.taskInstanceId = taskInstanceId;
     }
@@ -61,7 +62,7 @@ public class DBTaskAckCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.DB_TASK_ACK);
+        command.setType(CommandType.TASK_EXECUTE_RUNNING_ACK);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
@@ -69,6 +70,6 @@ public class DBTaskAckCommand implements Serializable {
 
     @Override
     public String toString() {
-        return "DBTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
+        return "TaskExecuteRunningAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
similarity index 81%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
index a3dbed5..0a2eac2 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
@@ -23,9 +23,10 @@ import java.io.Serializable;
 import java.util.Date;
 
 /**
- * execute task request command
+ * task execute running command
+ * from worker to master
  */
-public class TaskExecuteAckCommand implements Serializable {
+public class TaskExecuteRunningCommand implements Serializable {
 
     /**
      * taskInstanceId
@@ -62,6 +63,16 @@ public class TaskExecuteAckCommand implements Serializable {
      */
     private String executePath;
 
+    /**
+     * processId
+     */
+    private int processId;
+
+    /**
+     * appIds
+     */
+    private String appIds;
+
     public Date getStartTime() {
         return startTime;
     }
@@ -94,6 +105,14 @@ public class TaskExecuteAckCommand implements Serializable {
         this.taskInstanceId = taskInstanceId;
     }
 
+    public int getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public void setProcessInstanceId(int processInstanceId) {
+        this.processInstanceId = processInstanceId;
+    }
+
     public String getLogPath() {
         return logPath;
     }
@@ -110,6 +129,22 @@ public class TaskExecuteAckCommand implements Serializable {
         this.executePath = executePath;
     }
 
+    public int getProcessId() {
+        return processId;
+    }
+
+    public void setProcessId(int processId) {
+        this.processId = processId;
+    }
+
+    public String getAppIds() {
+        return appIds;
+    }
+
+    public void setAppIds(String appIds) {
+        this.appIds = appIds;
+    }
+
     /**
      * package request command
      *
@@ -117,7 +152,7 @@ public class TaskExecuteAckCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.TASK_EXECUTE_ACK);
+        command.setType(CommandType.TASK_EXECUTE_RUNNING);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
@@ -125,22 +160,16 @@ public class TaskExecuteAckCommand implements Serializable {
 
     @Override
     public String toString() {
-        return "TaskExecuteAckCommand{"
+        return "TaskExecuteRunningCommand{"
                 + "taskInstanceId=" + taskInstanceId
+                + ", processInstanceId='" + processInstanceId + '\''
                 + ", startTime=" + startTime
                 + ", host='" + host + '\''
                 + ", status=" + status
                 + ", logPath='" + logPath + '\''
                 + ", executePath='" + executePath + '\''
-                + ", processInstanceId='" + processInstanceId + '\''
+                + ", processId=" + processId + '\''
+                + ", appIds='" + appIds + '\''
                 + '}';
     }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public void setProcessInstanceId(int processInstanceId) {
-        this.processInstanceId = processInstanceId;
-    }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bf377d8..2255c08 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,13 +17,19 @@
 
 package org.apache.dolphinscheduler.service.process;
 
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+
+import static java.util.stream.Collectors.toSet;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -124,11 +130,10 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -143,17 +148,16 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.stream.Collectors.toSet;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 
 /**
  * process relative dao that some mappers in this.
@@ -164,6 +168,7 @@ public class ProcessService {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+            ExecutionStatus.DISPATCH.ordinal(),
             ExecutionStatus.RUNNING_EXECUTION.ordinal(),
             ExecutionStatus.DELAY_EXECUTION.ordinal(),
             ExecutionStatus.READY_PAUSE.ordinal(),
@@ -266,8 +271,8 @@ public class ProcessService {
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      *
-     * @param logger  logger
-     * @param host    host
+     * @param logger logger
+     * @param host host
      * @param command found command
      * @return process instance
      */
@@ -368,7 +373,7 @@ public class ProcessService {
     /**
      * set process waiting thread
      *
-     * @param command         command
+     * @param command command
      * @param processInstance processInstance
      * @return process instance
      */
@@ -581,8 +586,6 @@ public class ProcessService {
 
     /**
      * recursive delete all task instance by process instance id
-     *
-     * @param processInstanceId
      */
     public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
         List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
@@ -603,7 +606,7 @@ public class ProcessService {
      * recursive query sub process definition id by parent id.
      *
      * @param parentCode parentCode
-     * @param ids        ids
+     * @param ids ids
      */
     public void recurseFindSubProcess(long parentCode, List<Long> ids) {
         List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinition(parentCode);
@@ -628,7 +631,7 @@ public class ProcessService {
      * create recovery waiting thread  command and delete origin command at the same time.
      * if the recovery command is exists, only update the field update_time
      *
-     * @param originCommand   originCommand
+     * @param originCommand originCommand
      * @param processInstance processInstance
      */
     public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
@@ -684,7 +687,7 @@ public class ProcessService {
     /**
      * get schedule time from command
      *
-     * @param command  command
+     * @param command command
      * @param cmdParam cmdParam map
      * @return date
      */
@@ -713,8 +716,8 @@ public class ProcessService {
      * generate a new work process instance from command.
      *
      * @param processDefinition processDefinition
-     * @param command           command
-     * @param cmdParam          cmdParam map
+     * @param command command
+     * @param cmdParam cmdParam map
      * @return process instance
      */
     private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
@@ -799,7 +802,7 @@ public class ProcessService {
      * use definition creator's tenant.
      *
      * @param tenantId tenantId
-     * @param userId   userId
+     * @param userId userId
      * @return tenant
      */
     public Tenant getTenantForProcess(int tenantId, int userId) {
@@ -837,7 +840,7 @@ public class ProcessService {
     /**
      * check command parameters is valid
      *
-     * @param command  command
+     * @param command command
      * @param cmdParam cmdParam map
      * @return whether command param is valid
      */
@@ -857,7 +860,7 @@ public class ProcessService {
      * construct process instance according to one command.
      *
      * @param command command
-     * @param host    host
+     * @param host host
      * @return process instance
      */
     protected ProcessInstance constructProcessInstance(Command command, String host) {
@@ -1036,7 +1039,7 @@ public class ProcessService {
      * return complement data if the process start with complement data
      *
      * @param processInstance processInstance
-     * @param command         command
+     * @param command command
      * @return command type
      */
     private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
@@ -1051,8 +1054,8 @@ public class ProcessService {
      * initialize complement data parameters
      *
      * @param processDefinition processDefinition
-     * @param processInstance   processInstance
-     * @param cmdParam          cmdParam
+     * @param processInstance processInstance
+     * @param cmdParam cmdParam
      */
     private void initComplementDataParam(ProcessDefinition processDefinition,
                                          ProcessInstance processInstance,
@@ -1125,7 +1128,7 @@ public class ProcessService {
      * only the keys doesn't in sub process global would be joined.
      *
      * @param parentGlobalParams parentGlobalParams
-     * @param subGlobalParams    subGlobalParams
+     * @param subGlobalParams subGlobalParams
      * @return global params join
      */
     private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
@@ -1192,7 +1195,7 @@ public class ProcessService {
      * submit sub process to command
      *
      * @param processInstance processInstance
-     * @param taskInstance    taskInstance
+     * @param taskInstance taskInstance
      * @return task instance
      */
     @Transactional(rollbackFor = Exception.class)
@@ -1223,7 +1226,7 @@ public class ProcessService {
      * set map {parent instance id, task instance id, 0(child instance id)}
      *
      * @param parentInstance parentInstance
-     * @param parentTask     parentTask
+     * @param parentTask parentTask
      * @return process instance map
      */
     private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
@@ -1252,7 +1255,7 @@ public class ProcessService {
      * find previous task work process map.
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param parentTask            parentTask
+     * @param parentTask parentTask
      * @return process instance map
      */
     private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
@@ -1278,7 +1281,7 @@ public class ProcessService {
      * create sub work process command
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param task                  task
+     * @param task task
      */
     public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
         if (!task.isSubProcess()) {
@@ -1412,7 +1415,7 @@ public class ProcessService {
      * update sub process definition
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param childDefinitionCode   childDefinitionId
+     * @param childDefinitionCode childDefinitionId
      */
     private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
         ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
@@ -1427,7 +1430,7 @@ public class ProcessService {
     /**
      * submit task to mysql
      *
-     * @param taskInstance    taskInstance
+     * @param taskInstance taskInstance
      * @param processInstance processInstance
      * @return task instance
      */
@@ -1463,7 +1466,7 @@ public class ProcessService {
      * return stop if work process state is ready stop
      * if all of above are not satisfied, return submit success
      *
-     * @param taskInstance    taskInstance
+     * @param taskInstance taskInstance
      * @param processInstance processInstance
      * @return process instance state
      */
@@ -1476,6 +1479,7 @@ public class ProcessService {
                 state == ExecutionStatus.RUNNING_EXECUTION
                         || state == ExecutionStatus.DELAY_EXECUTION
                         || state == ExecutionStatus.KILL
+                        || state == ExecutionStatus.DISPATCH
         ) {
             return state;
         }
@@ -1689,7 +1693,7 @@ public class ProcessService {
      * get id list by task state
      *
      * @param instanceId instanceId
-     * @param state      state
+     * @param state state
      * @return task instance states
      */
     public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
@@ -1744,7 +1748,7 @@ public class ProcessService {
      * find work process map by parent process id and parent task id.
      *
      * @param parentWorkProcessId parentWorkProcessId
-     * @param parentTaskId        parentTaskId
+     * @param parentTaskId parentTaskId
      * @return process instance map
      */
     public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
@@ -1766,7 +1770,7 @@ public class ProcessService {
      * find sub process instance
      *
      * @param parentProcessId parentProcessId
-     * @param parentTaskId    parentTaskId
+     * @param parentTaskId parentTaskId
      * @return process instance
      */
     public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
@@ -1796,29 +1800,6 @@ public class ProcessService {
     }
 
     /**
-     * change task state
-     *
-     * @param state       state
-     * @param startTime   startTime
-     * @param host        host
-     * @param executePath executePath
-     * @param logPath     logPath
-     */
-    public void changeTaskState(TaskInstance taskInstance,
-                                ExecutionStatus state,
-                                Date startTime,
-                                String host,
-                                String executePath,
-                                String logPath) {
-        taskInstance.setState(state);
-        taskInstance.setStartTime(startTime);
-        taskInstance.setHost(host);
-        taskInstance.setExecutePath(executePath);
-        taskInstance.setLogPath(logPath);
-        saveTaskInstance(taskInstance);
-    }
-
-    /**
      * update process instance
      *
      * @param processInstance processInstance
@@ -1829,27 +1810,6 @@ public class ProcessService {
     }
 
     /**
-     * change task state
-     *
-     * @param state   state
-     * @param endTime endTime
-     * @param varPool varPool
-     */
-    public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
-                                Date endTime,
-                                int processId,
-                                String appIds,
-                                String varPool) {
-        taskInstance.setPid(processId);
-        taskInstance.setAppLink(appIds);
-        taskInstance.setState(state);
-        taskInstance.setEndTime(endTime);
-        taskInstance.setVarPool(varPool);
-        changeOutParam(taskInstance);
-        saveTaskInstance(taskInstance);
-    }
-
-    /**
      * for show in page of taskInstance
      */
     public void changeOutParam(TaskInstance taskInstance) {
@@ -2006,7 +1966,7 @@ public class ProcessService {
      * update process instance state by id
      *
      * @param processInstanceId processInstanceId
-     * @param executionStatus   executionStatus
+     * @param executionStatus executionStatus
      * @return update process result
      */
     public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
@@ -2042,7 +2002,7 @@ public class ProcessService {
     /**
      * find tenant code by resource name
      *
-     * @param resName      resource name
+     * @param resName resource name
      * @param resourceType resource type
      * @return tenant code
      */
@@ -2080,7 +2040,7 @@ public class ProcessService {
      * find last scheduler process instance in the date interval
      *
      * @param definitionCode definitionCode
-     * @param dateInterval   dateInterval
+     * @param dateInterval dateInterval
      * @return process instance
      */
     public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
@@ -2093,7 +2053,7 @@ public class ProcessService {
      * find last manual process instance interval
      *
      * @param definitionCode process definition code
-     * @param dateInterval   dateInterval
+     * @param dateInterval dateInterval
      * @return process instance
      */
     public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
@@ -2106,8 +2066,8 @@ public class ProcessService {
      * find last running process instance
      *
      * @param definitionCode process definition code
-     * @param startTime      start time
-     * @param endTime        end time
+     * @param startTime start time
+     * @param endTime end time
      * @return process instance
      */
     public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
@@ -2191,7 +2151,7 @@ public class ProcessService {
     /**
      * list unauthorized udf function
      *
-     * @param userId     user id
+     * @param userId user id
      * @param needChecks data source id array
      * @return unauthorized udf function list
      */
@@ -2599,7 +2559,7 @@ public class ProcessService {
      * add authorized resources
      *
      * @param ownResources own resources
-     * @param userId       userId
+     * @param userId userId
      */
     private void addAuthorizedResources(List<Resource> ownResources, int userId) {
         List<Integer> relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);
@@ -2742,12 +2702,7 @@ public class ProcessService {
     /**
      * the first time (when submit the task ) get the resource of the task group
      *
-     * @param taskId    task id
-     * @param taskName
-     * @param groupId
-     * @param processId
-     * @param priority
-     * @return
+     * @param taskId task id
      */
     public boolean acquireTaskGroup(int taskId,
                                     String taskName, int groupId,
@@ -2788,9 +2743,6 @@ public class ProcessService {
 
     /**
      * try to get the task group resource(when other task release the resource)
-     *
-     * @param taskGroupQueue
-     * @return
      */
     public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
         TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
@@ -2877,11 +2829,11 @@ public class ProcessService {
     /**
      * insert into task group queue
      *
-     * @param taskId    task id
-     * @param taskName  task name
-     * @param groupId   group id
+     * @param taskId task id
+     * @param taskName task name
+     * @param groupId group id
      * @param processId process id
-     * @param priority  priority
+     * @param priority priority
      * @return result and msg code
      */
     public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 882ab59..581c73c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -208,6 +208,16 @@ public class TaskExecutionContext {
     private ResourceParametersHelper resourceParametersHelper;
 
     /**
+     * endTime
+     */
+    private Date endTime;
+
+    /**
+     * sql TaskExecutionContext
+     */
+    private SQLTaskExecutionContext sqlTaskExecutionContext;
+
+    /**
      * resources full name and tenant code
      */
     private Map<String, String> resources;
@@ -538,12 +548,61 @@ public class TaskExecutionContext {
         this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext;
     }
 
+    public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
+        this.currentExecutionStatus = currentExecutionStatus;
+    }
+
     public ExecutionStatus getCurrentExecutionStatus() {
         return currentExecutionStatus;
     }
 
-    public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
-        this.currentExecutionStatus = currentExecutionStatus;
+    public Date getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(Date endTime) {
+        this.endTime = endTime;
+    }
+
+    @Override
+    public String toString() {
+        return "TaskExecutionContext{"
+                + "taskInstanceId=" + taskInstanceId
+                + ", taskName='" + taskName + '\''
+                + ", currentExecutionStatus=" + currentExecutionStatus
+                + ", firstSubmitTime=" + firstSubmitTime
+                + ", startTime=" + startTime
+                + ", taskType='" + taskType + '\''
+                + ", host='" + host + '\''
+                + ", executePath='" + executePath + '\''
+                + ", logPath='" + logPath + '\''
+                + ", taskJson='" + taskJson + '\''
+                + ", processId=" + processId
+                + ", processDefineCode=" + processDefineCode
+                + ", processDefineVersion=" + processDefineVersion
+                + ", appIds='" + appIds + '\''
+                + ", processInstanceId=" + processInstanceId
+                + ", scheduleTime=" + scheduleTime
+                + ", globalParams='" + globalParams + '\''
+                + ", executorId=" + executorId
+                + ", cmdTypeIfComplement=" + cmdTypeIfComplement
+                + ", tenantCode='" + tenantCode + '\''
+                + ", queue='" + queue + '\''
+                + ", projectCode=" + projectCode
+                + ", taskParams='" + taskParams + '\''
+                + ", envFile='" + envFile + '\''
+                + ", dryRun='" + dryRun + '\''
+                + ", definedParams=" + definedParams
+                + ", taskAppId='" + taskAppId + '\''
+                + ", taskTimeoutStrategy=" + taskTimeoutStrategy
+                + ", taskTimeout=" + taskTimeout
+                + ", workerGroup='" + workerGroup + '\''
+                + ", environmentConfig='" + environmentConfig + '\''
+                + ", delayTime=" + delayTime
+                + ", resources=" + resources
+                + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+                + ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
+                + '}';
     }
 
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index 7295f43..8d2774c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -62,7 +62,9 @@ public enum ExecutionStatus {
     FORCED_SUCCESS(13, "forced success"),
     SERIAL_WAIT(14, "serial wait"),
     READY_BLOCK(15, "ready block"),
-    BLOCK(16, "block");
+    BLOCK(16, "block"),
+    DISPATCH(17, "dispatch"),
+    ;
 
     ExecutionStatus(int code, String descp) {
         this.code = code;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 4311612..45401b6 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -28,10 +28,10 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
@@ -111,10 +111,10 @@ public class WorkerServer implements IStoppable {
     private TaskKillProcessor taskKillProcessor;
 
     @Autowired
-    private DBTaskAckProcessor dbTaskAckProcessor;
+    private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
 
     @Autowired
-    private DBTaskResponseProcessor dbTaskResponseProcessor;
+    private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
 
     @Autowired
     private HostUpdateProcessor hostUpdateProcessor;
@@ -143,8 +143,8 @@ public class WorkerServer implements IStoppable {
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, dbTaskAckProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, dbTaskResponseProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
 
         // logger server
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
index aaf557b..f28990b 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
@@ -30,28 +30,30 @@ public class ResponseCache {
 
     private static final ResponseCache instance = new ResponseCache();
 
-    private ResponseCache(){}
+    private ResponseCache() {
+    }
 
     public static ResponseCache get() {
         return instance;
     }
 
-    private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
-    private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
+    private Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
+    private Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
 
     /**
      * cache response
+     *
      * @param taskInstanceId taskInstanceId
      * @param command command
      * @param event event ACK/RESULT
      */
     public void cache(Integer taskInstanceId, Command command, Event event) {
         switch (event) {
-            case ACK:
-                ackCache.put(taskInstanceId,command);
+            case RUNNING:
+                runningCache.put(taskInstanceId, command);
                 break;
             case RESULT:
-                responseCache.put(taskInstanceId,command);
+                responseCache.put(taskInstanceId, command);
                 break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + event);
@@ -59,15 +61,17 @@ public class ResponseCache {
     }
 
     /**
-     * remove ack cache
+     * remove running cache
+     *
      * @param taskInstanceId taskInstanceId
      */
-    public void removeAckCache(Integer taskInstanceId) {
-        ackCache.remove(taskInstanceId);
+    public void removeRunningCache(Integer taskInstanceId) {
+        runningCache.remove(taskInstanceId);
     }
 
     /**
-     * remove reponse cache
+     * remove response cache
+     *
      * @param taskInstanceId taskInstanceId
      */
     public void removeResponseCache(Integer taskInstanceId) {
@@ -75,18 +79,20 @@ public class ResponseCache {
     }
 
     /**
-     * getAckCache
+     * get running cache
+     *
      * @return getAckCache
      */
-    public Map<Integer,Command> getAckCache() {
-        return ackCache;
+    public Map<Integer, Command> getRunningCache() {
+        return runningCache;
     }
 
     /**
      * getResponseCache
+     *
      * @return getResponseCache
      */
-    public Map<Integer,Command> getResponseCache() {
+    public Map<Integer, Command> getResponseCache() {
         return responseCache;
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 09b2c3a..9a0e894 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -19,16 +19,25 @@ package org.apache.dolphinscheduler.server.worker.processor;
 
 import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
+import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 
+import java.util.Arrays;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import io.netty.channel.Channel;
@@ -45,6 +54,12 @@ public class TaskCallbackService {
     private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
     private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200};
 
+    @Autowired
+    private TaskExecuteResponseAckProcessor taskExecuteRunningProcessor;
+
+    @Autowired
+    private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
+
     /**
      * remote channels
      */
@@ -58,15 +73,15 @@ public class TaskCallbackService {
     public TaskCallbackService() {
         final NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
-        this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
-        this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
     }
 
     /**
      * add callback channel
      *
      * @param taskInstanceId taskInstanceId
-     * @param channel        channel
+     * @param channel channel
      */
     public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
         REMOTE_CHANNELS.put(taskInstanceId, channel);
@@ -129,25 +144,12 @@ public class TaskCallbackService {
     }
 
     /**
-     * send ack
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param command        command
-     */
-    public void sendAck(int taskInstanceId, Command command) {
-        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
-        if (nettyRemoteChannel != null) {
-            nettyRemoteChannel.writeAndFlush(command);
-        }
-    }
-
-    /**
      * send result
      *
      * @param taskInstanceId taskInstanceId
-     * @param command        command
+     * @param command command
      */
-    public void sendResult(int taskInstanceId, Command command) {
+    public void send(int taskInstanceId, Command command) {
         NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
         if (nettyRemoteChannel != null) {
             nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@@ -161,6 +163,99 @@ public class TaskCallbackService {
                 }
             });
         }
+    }
+
+    /**
+     * build task execute running command
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @return TaskExecuteAckCommand
+     */
+    private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
+        TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
+        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+        command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+        command.setLogPath(taskExecutionContext.getLogPath());
+        command.setHost(taskExecutionContext.getHost());
+        command.setStartTime(taskExecutionContext.getStartTime());
+        command.setExecutePath(taskExecutionContext.getExecutePath());
+        return command;
+    }
+
+    /**
+     * build task execute response command
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @return TaskExecuteResponseCommand
+     */
+    private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
+        TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
+        command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+        command.setLogPath(taskExecutionContext.getLogPath());
+        command.setExecutePath(taskExecutionContext.getExecutePath());
+        command.setAppIds(taskExecutionContext.getAppIds());
+        command.setProcessId(taskExecutionContext.getProcessId());
+        command.setHost(taskExecutionContext.getHost());
+        command.setStartTime(taskExecutionContext.getStartTime());
+        command.setEndTime(taskExecutionContext.getEndTime());
+        command.setVarPool(taskExecutionContext.getVarPool());
+        command.setExecutePath(taskExecutionContext.getExecutePath());
+        return command;
+    }
+
+    /**
+     * build TaskKillResponseCommand
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @return build TaskKillResponseCommand
+     */
+    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
+        TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
+        taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+        taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+        taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        taskKillResponseCommand.setHost(taskExecutionContext.getHost());
+        taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+        return taskKillResponseCommand;
+    }
+
+    /**
+     * send task execute running command
+     * todo unified callback command
+     */
+    public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
+        TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
+        // add response cache
+        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RUNNING);
+        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
+    }
+
+    /**
+     * send task execute delay command
+     * todo unified callback command
+     */
+    public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) {
+        TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
+        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
+    }
+
+    /**
+     * send task execute response command
+     * todo unified callback command
+     */
+    public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
+        TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext);
+        // add response cache
+        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RESULT);
+        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
+    }
 
+    public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) {
+        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext);
+        send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
+        TaskCallbackService.remove(taskExecutionContext.getTaskInstanceId());
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 88021e1..29da668 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
@@ -30,12 +29,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@@ -128,6 +125,21 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
 
         if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
+            if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
+                OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+            }
+
+            // check if the OS user exists
+            if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
+                logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
+                        taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
+                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+                taskExecutionContext.setEndTime(new Date());
+                taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                return;
+            }
+
             // local execute path
             String execLocalPath = getExecLocalPath(taskExecutionContext);
             logger.info("task instance local execute path : {}", execLocalPath);
@@ -135,12 +147,13 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
             try {
                 FileUtils.createWorkDirIfAbsent(execLocalPath);
-                if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
-                    OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
-                }
             } catch (Throwable ex) {
-                logger.error("create execLocalPath: {}", execLocalPath, ex);
+                logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
+                logger.error("create executeLocalPath fail", ex);
                 TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+                taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                return;
             }
         }
 
@@ -153,48 +166,19 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
             logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
             taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
             taskExecutionContext.setStartTime(null);
-        } else {
-            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
-            taskExecutionContext.setStartTime(new Date());
+            taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
         }
 
-        this.doAck(taskExecutionContext);
-
         // submit task to manager
-        if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager))) {
-            logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getDelayQueueSize());
+        boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
+        if (!offer) {
+            logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
+                    workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
+            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
         }
     }
 
-    private void doAck(TaskExecutionContext taskExecutionContext) {
-        // tell master that task is in executing
-        TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
-        taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
-    }
-
-    /**
-     * build ack command
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return TaskExecuteAckCommand
-     */
-    private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
-        ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
-        ackCommand.setHost(taskExecutionContext.getHost());
-        ackCommand.setStartTime(taskExecutionContext.getStartTime());
-
-        ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
-
-        taskExecutionContext.setLogPath(ackCommand.getLogPath());
-        ackCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-
-        return ackCommand;
-    }
-
     /**
      * get execute local path
      *
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
similarity index 60%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
index bedc5a6..e080842 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 
@@ -34,31 +34,31 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 
 /**
- *  db task response processor
+ * task execute running ack, from master to worker
  */
 @Component
-public class DBTaskResponseProcessor implements NettyRequestProcessor {
+public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class);
+    private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseAckProcessor.class);
 
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(),
+        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE_ACK == command.getType(),
                 String.format("invalid command type : %s", command.getType()));
 
-        DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject(
-                command.getBody(), DBTaskResponseCommand.class);
+        TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = JSONUtils.parseObject(
+                command.getBody(), TaskExecuteResponseAckCommand.class);
 
-        if (taskResponseCommand == null) {
-            logger.error("dBTask Response  command is null");
+        if (taskExecuteResponseAckCommand == null) {
+            logger.error("task execute response ack command is null");
             return;
         }
-        logger.info("dBTask Response command : {}", taskResponseCommand);
+        logger.info("task execute response ack command : {}", taskExecuteResponseAckCommand);
 
-        if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
-            ResponseCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
-            TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
-            logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId());
+        if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+            ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
+            TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
+            logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteResponseAckCommand.getTaskInstanceId());
         }
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
similarity index 64%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
index 7a80f84..9d74dc5 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 
@@ -34,29 +34,29 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 
 /**
- *  db task ack processor
+ * task execute running ack processor
  */
 @Component
-public class DBTaskAckProcessor implements NettyRequestProcessor {
+public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class);
+    private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningAckProcessor.class);
 
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),
+        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
                 String.format("invalid command type : %s", command.getType()));
 
-        DBTaskAckCommand taskAckCommand = JSONUtils.parseObject(
-                command.getBody(), DBTaskAckCommand.class);
+        TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
+                command.getBody(), TaskExecuteRunningAckCommand.class);
 
-        if (taskAckCommand == null) {
-            logger.error("dBTask ACK request command is null");
+        if (runningAckCommand == null) {
+            logger.error("task execute running ack command is null");
             return;
         }
-        logger.info("dBTask ACK request command : {}", taskAckCommand);
+        logger.info("task execute running ack command : {}", runningAckCommand);
 
-        if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
-            ResponseCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
+        if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+            ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
         }
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 3ec0e9a..1c8fe55 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -90,10 +91,17 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
                 new NettyRemoteChannel(channel, command.getOpaque()));
 
-        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
-        taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
-        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
-        TaskCallbackService.remove(killCommand.getTaskInstanceId());
+        TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+        if (taskExecutionContext == null) {
+            logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
+            return;
+        }
+        taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
+        taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
+
+        taskCallbackService.sendTaskKillResponseCommand(taskExecutionContext);
+        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
         logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index 4201373..fc737ca 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -47,7 +47,7 @@ public class RetryReportTaskStatusThread implements Runnable {
     private TaskCallbackService taskCallbackService;
 
     public void start() {
-        Thread thread = new Thread(this,"RetryReportTaskStatusThread");
+        Thread thread = new Thread(this, "RetryReportTaskStatusThread");
         thread.setDaemon(true);
         thread.start();
     }
@@ -65,21 +65,21 @@ public class RetryReportTaskStatusThread implements Runnable {
             ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
 
             try {
-                if (!instance.getAckCache().isEmpty()) {
-                    Map<Integer,Command> ackCache =  instance.getAckCache();
-                    for (Map.Entry<Integer, Command> entry : ackCache.entrySet()) {
+                if (!instance.getRunningCache().isEmpty()) {
+                    Map<Integer, Command> runningCache = instance.getRunningCache();
+                    for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
                         Integer taskInstanceId = entry.getKey();
-                        Command ackCommand = entry.getValue();
-                        taskCallbackService.sendAck(taskInstanceId,ackCommand);
+                        Command runningCommand = entry.getValue();
+                        taskCallbackService.send(taskInstanceId, runningCommand);
                     }
                 }
 
                 if (!instance.getResponseCache().isEmpty()) {
-                    Map<Integer,Command> responseCache =  instance.getResponseCache();
+                    Map<Integer, Command> responseCache = instance.getResponseCache();
                     for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
                         Integer taskInstanceId = entry.getKey();
                         Command responseCommand = entry.getValue();
-                        taskCallbackService.sendResult(taskInstanceId,responseCommand);
+                        taskCallbackService.send(taskInstanceId, responseCommand);
                     }
                 }
             } catch (Exception e) {
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 8f3dc3a..7670d49 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,19 +17,15 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
-import com.github.rholder.retry.RetryException;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.RetryerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -37,17 +33,14 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -57,11 +50,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * task scheduler thread
@@ -131,35 +124,26 @@ public class TaskExecuteThread implements Runnable, Delayed {
 
     @Override
     public void run() {
-        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
         if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
-            responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
-            responseCommand.setEndTime(new Date());
+            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
+            taskExecutionContext.setStartTime(new Date());
+            taskExecutionContext.setEndTime(new Date());
             TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-            ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
-            taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
             return;
         }
 
         try {
             logger.info("script path : {}", taskExecutionContext.getExecutePath());
-            // check if the OS user exists
-            if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
-                String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode());
-                logger.error(errorLog);
-                responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
-                responseCommand.setEndTime(new Date());
-                return;
-            }
-
             if (taskExecutionContext.getStartTime() == null) {
                 taskExecutionContext.setStartTime(new Date());
             }
-            if (taskExecutionContext.getCurrentExecutionStatus() != ExecutionStatus.RUNNING_EXECUTION) {
-                changeTaskExecutionStatusToRunning();
-            }
             logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
 
+            // callback task execute running
+            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+            taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
+
             // copy hdfs/minio file to local
             downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);
 
@@ -197,29 +181,27 @@ public class TaskExecuteThread implements Runnable, Delayed {
             // task handle
             this.task.handle();
 
-            responseCommand.setStatus(this.task.getExitStatus().getCode());
-
             // task result process
             if (this.task.getNeedAlert()) {
-                sendAlert(this.task.getTaskAlertInfo(), responseCommand.getStatus());
+                sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
             }
 
-            responseCommand.setEndTime(new Date());
-            responseCommand.setProcessId(this.task.getProcessId());
-            responseCommand.setAppIds(this.task.getAppIds());
-            responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
+            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
+            taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
+            taskExecutionContext.setProcessId(this.task.getProcessId());
+            taskExecutionContext.setAppIds(this.task.getAppIds());
+            taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
             logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
         } catch (Throwable e) {
             logger.error("task scheduler failure", e);
             kill();
-            responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
-            responseCommand.setEndTime(new Date());
-            responseCommand.setProcessId(task.getProcessId());
-            responseCommand.setAppIds(task.getAppIds());
+            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+            taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
+            taskExecutionContext.setProcessId(this.task.getProcessId());
+            taskExecutionContext.setAppIds(this.task.getAppIds());
         } finally {
             TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-            ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
-            taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
             clearTaskExecPath();
         }
     }
@@ -312,7 +294,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
                     // query the tenant code of the resource according to the name of the resource
                     String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
                     logger.info("get resource file from hdfs :{}", resHdfsPath);
-                    storageOperate.download(tenantCode,resHdfsPath, execLocalPath + File.separator + fullName, false, true);
+                    storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                     throw new ServiceException(e.getMessage());
@@ -324,40 +306,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
     }
 
     /**
-     * send an ack to change the status of the task.
-     */
-    private void changeTaskExecutionStatusToRunning() {
-        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
-        Command ackCommand = buildAckCommand().convert2Command();
-        try {
-            RetryerUtils.retryCall(() -> {
-                taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
-                return Boolean.TRUE;
-            });
-        } catch (ExecutionException | RetryException e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * build ack command.
-     *
-     * @return TaskExecuteAckCommand
-     */
-    private TaskExecuteAckCommand buildAckCommand() {
-        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
-        ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        ackCommand.setStartTime(taskExecutionContext.getStartTime());
-        ackCommand.setLogPath(taskExecutionContext.getLogPath());
-        ackCommand.setHost(taskExecutionContext.getHost());
-
-        ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
-
-        return ackCommand;
-    }
-
-    /**
      * get current TaskExecutionContext
      *
      * @return TaskExecutionContext
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 9fd7bafc..0dd28e5 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -108,7 +108,7 @@ public class WorkerManagerThread implements Runnable {
         TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
         responseCommand.setStatus(ExecutionStatus.KILL.getCode());
         ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
-        taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
     }
 
     /**
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index fba6c0e..2bb23ca 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -53,7 +53,7 @@ import org.slf4j.Logger;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
-    JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+        JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
 @Ignore
 public class TaskExecuteProcessorTest {
 
@@ -84,7 +84,7 @@ public class TaskExecuteProcessorTest {
         workerConfig.setListenPort(1234);
         command = new Command();
         command.setType(CommandType.TASK_EXECUTE_REQUEST);
-        ackCommand = new TaskExecuteAckCommand().convert2Command();
+        ackCommand = new TaskExecuteRunningCommand().convert2Command();
         taskRequestCommand = new TaskExecuteRequestCommand();
         alertClientService = PowerMockito.mock(AlertClientService.class);
         workerExecService = PowerMockito.mock(ExecutorService.class);
@@ -95,7 +95,7 @@ public class TaskExecuteProcessorTest {
         PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
 
         taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
-        PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+        PowerMockito.doNothing().when(taskCallbackService).send(taskExecutionContext.getTaskInstanceId(), ackCommand);
 
         PowerMockito.mockStatic(SpringApplicationContext.class);
         PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
@@ -125,10 +125,10 @@ public class TaskExecuteProcessorTest {
 
         PowerMockito.mockStatic(FileUtils.class);
         PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
-                taskExecutionContext.getProcessDefineCode(),
-                taskExecutionContext.getProcessDefineVersion(),
-                taskExecutionContext.getProcessInstanceId(),
-                taskExecutionContext.getTaskInstanceId()))
+                        taskExecutionContext.getProcessDefineCode(),
+                        taskExecutionContext.getProcessDefineVersion(),
+                        taskExecutionContext.getProcessInstanceId(),
+                        taskExecutionContext.getTaskInstanceId()))
                 .thenReturn(taskExecutionContext.getExecutePath());
         PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());