You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/12 06:08:48 UTC

[dolphinscheduler] branch dev updated: [Fix-10827] Fix network error cause worker cannot send message to master (#10886)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new cade66a9b6 [Fix-10827] Fix network error cause worker cannot send message to master (#10886)
cade66a9b6 is described below

commit cade66a9b6b14450c6f778f7e05820b588856a41
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Jul 12 14:08:42 2022 +0800

    [Fix-10827] Fix network error cause worker cannot send message to master (#10886)
    
    * Fix network error cause worker cannot send message to master
---
 .../server/master/config/MasterConfig.java         |   6 +
 .../master/consumer/TaskPriorityQueueConsumer.java |   8 +-
 .../dispatch/executor/NettyExecutorManager.java    |   4 +-
 .../server/master/event/TaskDelayEventHandler.java |   8 +-
 .../event/TaskRejectByWorkerEventHandler.java      |  19 +-
 .../master/event/TaskResultEventHandler.java       |  20 +-
 .../master/event/TaskRunningEventHandler.java      |   8 +-
 .../processor/TaskExecuteResponseProcessor.java    |  17 +-
 .../processor/TaskExecuteRunningProcessor.java     |   6 +-
 .../master/processor/TaskRecallProcessor.java      |   6 +-
 .../server/master/processor/queue/TaskEvent.java   |   8 +-
 .../server/master/rpc/MasterRPCServer.java         |   4 +-
 .../master/runner/StateWheelExecuteThread.java     |   2 -
 .../master/dispatch/ExecutionContextTestUtils.java |  16 +-
 .../master/dispatch/ExecutorDispatcherTest.java    |   6 +-
 .../executor/NettyExecutorManagerTest.java         |  25 +-
 .../master/processor/TaskAckProcessorTest.java     |  20 +-
 .../processor/queue/TaskResponseServiceTest.java   |  47 ++--
 .../remote/command/BaseCommand.java                |  57 +++++
 .../remote/command/CommandHeader.java              |  47 +---
 .../remote/command/CommandType.java                |  39 +--
 .../remote/command/HostUpdateCommand.java          |   3 -
 .../remote/command/StateEventResponseCommand.java  |   2 +-
 ...equestCommand.java => TaskDispatchCommand.java} |  24 +-
 ...lAckCommand.java => TaskExecuteAckCommand.java} |  48 ++--
 .../remote/command/TaskExecuteResponseCommand.java | 212 ----------------
 ...kCommand.java => TaskExecuteResultCommand.java} |  97 +++++---
 ...mand.java => TaskExecuteRunningAckMessage.java} |   6 +-
 .../remote/command/TaskExecuteRunningCommand.java  | 101 +-------
 ...llAckCommand.java => TaskRejectAckCommand.java} |  47 ++--
 ...skRecallCommand.java => TaskRejectCommand.java} |  48 +---
 .../apache/dolphinscheduler/remote/utils/Host.java |   3 +-
 .../server/worker/WorkerServer.java                |  18 +-
 .../server/worker/cache/ResponseCache.java         | 115 ---------
 .../server/worker/config/WorkerConfig.java         |   8 +-
 .../server/worker/message/MessageRetryRunner.java  | 139 +++++++++++
 .../server/worker/message/MessageSender.java       |  49 ++--
 .../message/TaskExecuteResultMessageSender.java    |  70 ++++++
 .../message/TaskExecuteRunningMessageSender.java   |  67 +++++
 .../worker/message/TaskRejectMessageSender.java    |  59 +++++
 .../worker/processor/HostUpdateProcessor.java      |  13 +-
 .../worker/processor/TaskCallbackService.java      | 276 ---------------------
 ...teProcessor.java => TaskDispatchProcessor.java} |  78 +++---
 .../processor/TaskExecuteResponseAckProcessor.java |  71 ------
 .../processor/TaskExecuteResultAckProcessor.java   |  83 +++++++
 .../processor/TaskExecuteRunningAckProcessor.java  |  17 +-
 .../server/worker/processor/TaskKillProcessor.java |  50 ++--
 ...kProcessor.java => TaskRejectAckProcessor.java} |  40 +--
 .../server/worker/rpc/WorkerMessageSender.java     |  92 +++++++
 .../server/worker/rpc/WorkerRpcClient.java         |  75 ++++++
 .../server/worker/rpc/WorkerRpcServer.java         |  18 +-
 .../worker/runner/RetryReportTaskStatusThread.java | 134 ----------
 .../server/worker/runner/TaskExecuteThread.java    |  85 ++++---
 .../server/worker/runner/WorkerExecService.java    |   6 +-
 .../server/worker/runner/WorkerManagerThread.java  |  19 +-
 ...sorTest.java => TaskDispatchProcessorTest.java} |  92 ++++---
 .../worker/runner/TaskExecuteThreadTest.java       |  12 +-
 57 files changed, 1224 insertions(+), 1426 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 0494b7dd00..fe51e20984 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.config;
 
+import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -82,6 +83,10 @@ public class MasterConfig implements Validator {
     private double reservedMemory = 0.3;
     private Duration failoverInterval = Duration.ofMinutes(10);
     private boolean killYarnJobWhenTaskFailover = true;
+    /**
+     * ip:listenPort
+     */
+    private String masterAddress;
 
     @Override
     public boolean supports(Class<?> clazz) {
@@ -124,5 +129,6 @@ public class MasterConfig implements Validator {
         if (masterConfig.getMaxCpuLoadAvg() <= 0) {
             masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
+        masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 82198e942a..9c7d048ff8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -241,7 +241,11 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
     }
 
     private Command toCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
+        // todo: we didn't set the host here, since right now we didn't need to retry this message.
+        TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
+                                                                     masterConfig.getMasterAddress(),
+                                                                     taskExecutionContext.getHost(),
+                                                                     System.currentTimeMillis());
         return requestCommand.convert2Command();
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index fe172f9816..e273e8f8aa 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -88,10 +88,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
 
     @PostConstruct
     public void init() {
-        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
-        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
         this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
-        this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
     }
 
     /**
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
index 8e2dcced23..25fa88dd00 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -108,9 +108,9 @@ public class TaskDelayEventHandler implements TaskEventHandler {
 
     private void sendAckToWorker(TaskEvent taskEvent) {
         // If event handle success, send ack to worker to otherwise the worker will retry this event
-        TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
-            new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-        taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+        TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
+            new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+        taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
index d09a8364ca..ac4dab50f2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
@@ -20,8 +20,9 @@ package org.apache.dolphinscheduler.server.master.event;
 import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
@@ -34,13 +35,16 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
+    @Autowired
+    private MasterConfig masterConfig;
+
     @Override
     public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
         int taskInstanceId = taskEvent.getTaskInstanceId();
         int processInstanceId = taskEvent.getProcessInstanceId();
 
-        WorkflowExecuteRunnable workflowExecuteRunnable =
-            this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
+            processInstanceId);
         if (workflowExecuteRunnable == null) {
             sendAckToWorker(taskEvent);
             throw new TaskEventHandleError(
@@ -65,9 +69,12 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
     }
 
     public void sendAckToWorker(TaskEvent taskEvent) {
-        TaskRecallAckCommand taskRecallAckCommand =
-            new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-        taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command());
+        TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
+                                                                             taskEvent.getTaskInstanceId(),
+                                                                             masterConfig.getMasterAddress(),
+                                                                             taskEvent.getWorkerAddress(),
+                                                                             System.currentTimeMillis());
+        taskEvent.getChannel().writeAndFlush(taskRejectAckMessage.convert2Command());
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index 67df03682f..0373baeee1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -22,8 +22,9 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -50,13 +51,16 @@ public class TaskResultEventHandler implements TaskEventHandler {
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private MasterConfig masterConfig;
+
     @Override
     public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException {
         int taskInstanceId = taskEvent.getTaskInstanceId();
         int processInstanceId = taskEvent.getProcessInstanceId();
 
-        WorkflowExecuteRunnable workflowExecuteRunnable =
-            this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
+            processInstanceId);
         if (workflowExecuteRunnable == null) {
             sendAckToWorker(taskEvent);
             throw new TaskEventHandleError(
@@ -105,9 +109,13 @@ public class TaskResultEventHandler implements TaskEventHandler {
     }
 
     public void sendAckToWorker(TaskEvent taskEvent) {
-        TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
-            new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-        taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+        // we didn't set the receiver address, since the ack doen's need to retry
+        TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS.getCode(),
+                                                                                taskEvent.getTaskInstanceId(),
+                                                                                masterConfig.getMasterAddress(),
+                                                                                taskEvent.getWorkerAddress(),
+                                                                                System.currentTimeMillis());
+        taskEvent.getChannel().writeAndFlush(taskExecuteAckMessage.convert2Command());
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
index 31152973a2..d271839596 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -106,9 +106,9 @@ public class TaskRunningEventHandler implements TaskEventHandler {
 
     private void sendAckToWorker(TaskEvent taskEvent) {
         // If event handle success, send ack to worker to otherwise the worker will retry this event
-        TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
-            new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-        taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+        TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
+            new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+        taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
index b17def873c..7f438e1d9c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -55,15 +55,18 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
      */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
+                                    String.format("invalid command type : %s", command.getType()));
 
-        TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
-        TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
+        TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
+                                                                                  TaskExecuteResultCommand.class);
+        TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
         try {
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getTaskInstanceId());
-            logger.info("Received task execute response, event: {}", taskResponseEvent);
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
+                                                        taskResultEvent.getTaskInstanceId());
+            logger.info("Received task execute result, event: {}", taskResultEvent);
 
-            taskEventService.addEvent(taskResponseEvent);
+            taskEventService.addEvent(taskResultEvent);
         } finally {
             LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
index c5ddc3d77a..96ff1ca405 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
@@ -54,10 +54,10 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
-        TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
-        logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
+        TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
+        logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage);
 
-        TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
+        TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
         taskEventService.addEvent(taskEvent);
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
index e37dc6e06a..a5d404ec65 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -54,8 +54,8 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
      */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
-        TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
+        Preconditions.checkArgument(CommandType.TASK_REJECT == command.getType(), String.format("invalid command type : %s", command.getType()));
+        TaskRejectCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectCommand.class);
         TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
         try {
             LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 248d253739..e383cad612 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.enums.TaskEventType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
-import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 
 import java.util.Date;
@@ -120,7 +120,7 @@ public class TaskEvent {
         return event;
     }
 
-    public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Channel channel) {
+    public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) {
         TaskEvent event = new TaskEvent();
         event.setProcessInstanceId(command.getProcessInstanceId());
         event.setTaskInstanceId(command.getTaskInstanceId());
@@ -138,7 +138,7 @@ public class TaskEvent {
         return event;
     }
 
-    public static TaskEvent newRecallEvent(TaskRecallCommand command, Channel channel) {
+    public static TaskEvent newRecallEvent(TaskRejectCommand command, Channel channel) {
         TaskEvent event = new TaskEvent();
         event.setTaskInstanceId(command.getTaskInstanceId());
         event.setProcessInstanceId(command.getProcessInstanceId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 49d40a4b04..b6b86979de 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -84,14 +84,14 @@ public class MasterRPCServer implements AutoCloseable {
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(masterConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor);
 
         // logger server
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 843c1b246c..0d1b3a423a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -126,8 +126,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
         boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
         if (removeFlag) {
             logger.info("Success remove workflow instance from timeout check list");
-        } else {
-            logger.warn("Failed to remove workflow instance from timeout check list");
         }
     }
 
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
index 55dd236f71..7252f747ba 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
@@ -24,12 +24,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
 import org.mockito.Mockito;
 
@@ -44,12 +43,15 @@ public class ExecutionContextTestUtils {
         processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
         taskInstance.setProcessInstance(processInstance);
         TaskExecutionContext context = TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                .buildProcessInstanceRelatedInfo(processInstance)
-                .buildProcessDefinitionRelatedInfo(processDefinition)
-                .create();
+                                                                  .buildTaskInstanceRelatedInfo(taskInstance)
+                                                                  .buildProcessInstanceRelatedInfo(processInstance)
+                                                                  .buildProcessDefinitionRelatedInfo(processDefinition)
+                                                                  .create();
 
-        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
+        TaskDispatchCommand requestCommand = new TaskDispatchCommand(context,
+                                                                     "127.0.0.1:5678",
+                                                                     "127.0.0.1:5678",
+                                                                     System.currentTimeMillis());
         Command command = requestCommand.convert2Command();
 
         ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER, taskInstance);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
index 4bfc3ca0a6..fa80e8344a 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
@@ -18,11 +18,12 @@
 package org.apache.dolphinscheduler.server.master.dispatch;
 
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 
 import org.junit.Ignore;
@@ -60,7 +61,8 @@ public class ExecutorDispatcherTest {
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(port);
         NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, Mockito.mock(TaskExecuteProcessor.class));
+        nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, Mockito.mock(
+            TaskDispatchProcessor.class));
         nettyRemotingServer.start();
         //
         workerConfig.setListenPort(port);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index 1ee890faf2..b69ab890f9 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -25,15 +25,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
 
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -49,16 +48,15 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 @RunWith(SpringJUnit4ClassRunner.class)
 @Ignore
 public class NettyExecutorManagerTest {
-
     @Autowired
     private NettyExecutorManager nettyExecutorManager;
-
     @Test
     public void testExecute() throws ExecuteException {
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
         NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
+        nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_DISPATCH_REQUEST,
+                                              new TaskDispatchProcessor());
         nettyRemotingServer.start();
         TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
         ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class);
@@ -66,10 +64,10 @@ public class NettyExecutorManagerTest {
         processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
         taskInstance.setProcessInstance(processInstance);
         TaskExecutionContext context = TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                .buildProcessInstanceRelatedInfo(processInstance)
-                .buildProcessDefinitionRelatedInfo(processDefinition)
-                .create();
+                                                                  .buildTaskInstanceRelatedInfo(taskInstance)
+                                                                  .buildProcessInstanceRelatedInfo(processInstance)
+                                                                  .buildProcessDefinitionRelatedInfo(processDefinition)
+                                                                  .create();
         ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
         executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
         Boolean execute = nettyExecutorManager.execute(executionContext);
@@ -94,10 +92,11 @@ public class NettyExecutorManagerTest {
         nettyExecutorManager.execute(executionContext);
 
     }
-
     private Command toCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
-        requestCommand.setTaskExecutionContext(taskExecutionContext);
+        TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
+                                                                     "127.0.0.1:5678",
+                                                                     "127.0.0.1:1234",
+                                                                     System.currentTimeMillis());
         return requestCommand.convert2Command();
     }
 }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
index fc320ab493..8c15cd9cbe 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
@@ -44,7 +44,7 @@ public class TaskAckProcessorTest {
     private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
     private TaskEventService taskEventService;
     private ProcessService processService;
-    private TaskExecuteRunningCommand taskExecuteRunningCommand;
+    private TaskExecuteRunningCommand taskExecuteRunningMessage;
     private TaskEvent taskResponseEvent;
     private Channel channel;
 
@@ -63,14 +63,16 @@ public class TaskAckProcessorTest {
         channel = PowerMockito.mock(Channel.class);
         taskResponseEvent = PowerMockito.mock(TaskEvent.class);
 
-        taskExecuteRunningCommand = new TaskExecuteRunningCommand();
-        taskExecuteRunningCommand.setStatus(1);
-        taskExecuteRunningCommand.setExecutePath("/dolphinscheduler/worker");
-        taskExecuteRunningCommand.setHost("localhost");
-        taskExecuteRunningCommand.setLogPath("/temp/worker.log");
-        taskExecuteRunningCommand.setStartTime(new Date());
-        taskExecuteRunningCommand.setTaskInstanceId(1);
-        taskExecuteRunningCommand.setProcessInstanceId(1);
+        taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
+                                                                  " 127.0.0.1:1234",
+                                                                  System.currentTimeMillis());
+        taskExecuteRunningMessage.setStatus(1);
+        taskExecuteRunningMessage.setExecutePath("/dolphinscheduler/worker");
+        taskExecuteRunningMessage.setHost("localhost");
+        taskExecuteRunningMessage.setLogPath("/temp/worker.log");
+        taskExecuteRunningMessage.setStartTime(new Date());
+        taskExecuteRunningMessage.setTaskInstanceId(1);
+        taskExecuteRunningMessage.setProcessInstanceId(1);
     }
 
     @Test
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index c5b00db12c..3854ad77b0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -17,9 +17,10 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
+import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
 import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -76,26 +77,30 @@ public class TaskResponseServiceTest {
 
         Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));
 
-        TaskExecuteRunningCommand taskExecuteRunningCommand = new TaskExecuteRunningCommand();
-        taskExecuteRunningCommand.setProcessId(1);
-        taskExecuteRunningCommand.setTaskInstanceId(22);
-        taskExecuteRunningCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
-        taskExecuteRunningCommand.setExecutePath("path");
-        taskExecuteRunningCommand.setLogPath("logPath");
-        taskExecuteRunningCommand.setHost("127.*.*.*");
-        taskExecuteRunningCommand.setStartTime(new Date());
-
-        ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
-
-        TaskExecuteResponseCommand taskExecuteResponseCommand = new TaskExecuteResponseCommand();
-        taskExecuteResponseCommand.setProcessInstanceId(1);
-        taskExecuteResponseCommand.setTaskInstanceId(22);
-        taskExecuteResponseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
-        taskExecuteResponseCommand.setEndTime(new Date());
-        taskExecuteResponseCommand.setVarPool("varPol");
-        taskExecuteResponseCommand.setAppIds("ids");
-        taskExecuteResponseCommand.setProcessId(1);
-        resultEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
+        TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
+                                                                                            "127.0.0.1:1234",
+                                                                                            System.currentTimeMillis());
+        taskExecuteRunningMessage.setProcessId(1);
+        taskExecuteRunningMessage.setTaskInstanceId(22);
+        taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
+        taskExecuteRunningMessage.setExecutePath("path");
+        taskExecuteRunningMessage.setLogPath("logPath");
+        taskExecuteRunningMessage.setHost("127.*.*.*");
+        taskExecuteRunningMessage.setStartTime(new Date());
+
+        ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
+
+        TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234),
+                                                                                         NetUtils.getAddr(5678),
+                                                                                         System.currentTimeMillis());
+        taskExecuteResultMessage.setProcessInstanceId(1);
+        taskExecuteResultMessage.setTaskInstanceId(22);
+        taskExecuteResultMessage.setStatus(ExecutionStatus.SUCCESS.getCode());
+        taskExecuteResultMessage.setEndTime(new Date());
+        taskExecuteResultMessage.setVarPool("varPol");
+        taskExecuteResultMessage.setAppIds("ids");
+        taskExecuteResultMessage.setProcessId(1);
+        resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
 
         taskInstance = new TaskInstance();
         taskInstance.setId(22);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java
new file mode 100644
index 0000000000..8207b5fecc
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import java.io.Serializable;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * This is the base class for rpc message.
+ * <p>
+ * Since we use async mode, the client send a message and will wait the target server
+ * send ack for the message, the client will retry during a while if he doesn't receive an ack.
+ * <p>
+ * When there is a network error, the server cannot send ack to the client by the origin channel,
+ * since the client has closed the channel, so the server need to know the command source.
+ */
+@Data
+@NoArgsConstructor
+public abstract class BaseCommand implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    /**
+     * If the message receiver want to send ack to the sender, need to use this address.
+     */
+    protected String messageSenderAddress;
+
+    /**
+     * The message receiver address.
+     */
+    protected String messageReceiverAddress;
+
+    protected long messageSendTime;
+
+    protected BaseCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
+        this.messageSenderAddress = messageSenderAddress;
+        this.messageReceiverAddress = messageReceiverAddress;
+        this.messageSendTime = messageSendTime;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
index 9e83a426f9..32eb75dc90 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
@@ -14,13 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.command;
 
 import java.io.Serializable;
 
+import lombok.Data;
+
 /**
  *  command header
  */
+@Data
 public class CommandHeader implements Serializable {
 
     /**
@@ -34,12 +38,12 @@ public class CommandHeader implements Serializable {
     private long opaque;
 
     /**
-     *  context length
+     * context length
      */
     private int contextLength;
 
     /**
-     *  context
+     * context
      */
     private byte[] context;
 
@@ -48,43 +52,4 @@ public class CommandHeader implements Serializable {
      */
     private int bodyLength;
 
-    public int getBodyLength() {
-        return bodyLength;
-    }
-
-    public void setBodyLength(int bodyLength) {
-        this.bodyLength = bodyLength;
-    }
-
-    public byte getType() {
-        return type;
-    }
-
-    public void setType(byte type) {
-        this.type = type;
-    }
-
-    public long getOpaque() {
-        return opaque;
-    }
-
-    public void setOpaque(long opaque) {
-        this.opaque = opaque;
-    }
-
-    public int getContextLength() {
-        return contextLength;
-    }
-
-    public void setContextLength(int contextLength) {
-        this.contextLength = contextLength;
-    }
-
-    public byte[] getContext() {
-        return context;
-    }
-
-    public void setContext(byte[] context) {
-        this.context = context;
-    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 43a64fc559..040d82d974 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -64,9 +64,9 @@ public enum CommandType {
     MASTER_RESPONSE,
 
     /**
-     * execute task request
+     * dispatch task request
      */
-    TASK_EXECUTE_REQUEST,
+    TASK_DISPATCH_REQUEST,
 
     /**
      * task execute running, from worker to master
@@ -81,56 +81,29 @@ public enum CommandType {
     /**
      * task execute response, from worker to master
      */
-    TASK_EXECUTE_RESPONSE,
+    TASK_EXECUTE_RESULT,
 
     /**
      * task execute response ack, from master to worker
      */
-    TASK_EXECUTE_RESPONSE_ACK,
+    TASK_EXECUTE_RESULT_ACK,
 
-    /**
-     * kill task
-     */
     TASK_KILL_REQUEST,
 
-    /**
-     * kill task response
-     */
     TASK_KILL_RESPONSE,
 
-    /**
-     * task recall
-     */
-    TASK_RECALL,
+    TASK_REJECT,
 
-    /**
-     * task recall ack
-     */
-    TASK_RECALL_ACK,
+    TASK_REJECT_ACK,
 
-    /**
-     * HEART_BEAT
-     */
     HEART_BEAT,
 
-    /**
-     * ping
-     */
     PING,
 
-    /**
-     * pong
-     */
     PONG,
 
-    /**
-     * alert send request
-     */
     ALERT_SEND_REQUEST,
 
-    /**
-     * alert send response
-     */
     ALERT_SEND_RESPONSE,
 
     /**
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
index 4fc752e4b0..d060a8e0f5 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
@@ -26,9 +26,6 @@ import java.io.Serializable;
  */
 public class HostUpdateCommand implements Serializable {
 
-    /**
-     * task id
-     */
     private int taskInstanceId;
 
     private String processHost;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
index 41ee1ef2ee..5500c744bd 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
@@ -61,7 +61,7 @@ public class StateEventResponseCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
+        command.setType(CommandType.TASK_EXECUTE_RESULT_ACK);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java
similarity index 66%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java
index 000e3f4e02..f04c6107c1 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java
@@ -20,24 +20,36 @@ package org.apache.dolphinscheduler.remote.command;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 
-import java.io.Serializable;
-
-import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import lombok.ToString;
+
 
+/**
+ * The task dispatch message, means dispatch a task to worker.
+ */
 @Data
 @NoArgsConstructor
-@AllArgsConstructor
-public class TaskExecuteRequestCommand implements Serializable {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskDispatchCommand extends BaseCommand {
 
     private static final long serialVersionUID = -1L;
 
     private TaskExecutionContext taskExecutionContext;
 
+    public TaskDispatchCommand(TaskExecutionContext taskExecutionContext,
+                               String messageSenderAddress,
+                               String messageReceiverAddress,
+                               long messageSendTime) {
+        super(messageSenderAddress, messageReceiverAddress, messageSendTime);
+        this.taskExecutionContext = taskExecutionContext;
+    }
+
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.TASK_EXECUTE_REQUEST);
+        command.setType(CommandType.TASK_DISPATCH_REQUEST);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
similarity index 63%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
index 2221a6c09d..a70cf8f239 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
@@ -19,41 +19,34 @@ package org.apache.dolphinscheduler.remote.command;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
-import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
 
 /**
- * task recall ack command
+ * task execute response ack command
+ * from master to worker
  */
-public class TaskRecallAckCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskExecuteAckCommand extends BaseCommand {
 
     private int taskInstanceId;
     private int status;
 
-    public TaskRecallAckCommand() {
-        super();
-    }
-
-    public TaskRecallAckCommand(int status, int taskInstanceId) {
+    public TaskExecuteAckCommand(int status,
+                                 int taskInstanceId,
+                                 String sourceServerAddress,
+                                 String messageReceiverAddress,
+                                 long messageSendTime) {
+        super(sourceServerAddress, messageReceiverAddress, messageSendTime);
         this.status = status;
         this.taskInstanceId = taskInstanceId;
     }
 
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public int getStatus() {
-        return status;
-    }
-
-    public void setStatus(int status) {
-        this.status = status;
-    }
-
     /**
      * package response command
      *
@@ -61,14 +54,9 @@ public class TaskRecallAckCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.TASK_RECALL_ACK);
+        command.setType(CommandType.TASK_EXECUTE_RESULT_ACK);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
-
-    @Override
-    public String toString() {
-        return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
-    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
deleted file mode 100644
index 4574b8818c..0000000000
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.remote.command;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import java.io.Serializable;
-import java.util.Date;
-
-/**
- * execute task response command
- */
-public class TaskExecuteResponseCommand implements Serializable {
-
-    public TaskExecuteResponseCommand() {
-    }
-
-    public TaskExecuteResponseCommand(int taskInstanceId, int processInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-        this.processInstanceId = processInstanceId;
-    }
-
-    /**
-     * task instance id
-     */
-    private int taskInstanceId;
-
-    /**
-     * process instance id
-     */
-    private int processInstanceId;
-
-    /**
-     * status
-     */
-    private int status;
-
-    /**
-     * startTime
-     */
-    private Date startTime;
-
-    /**
-     * host
-     */
-    private String host;
-
-    /**
-     * logPath
-     */
-    private String logPath;
-
-    /**
-     * executePath
-     */
-    private String executePath;
-
-
-    /**
-     * end time
-     */
-    private Date endTime;
-
-
-    /**
-     * processId
-     */
-    private int processId;
-
-    /**
-     * appIds
-     */
-    private String appIds;
-
-    /**
-     * varPool string
-     */
-    private String varPool;
-
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public String getLogPath() {
-        return logPath;
-    }
-
-    public void setLogPath(String logPath) {
-        this.logPath = logPath;
-    }
-
-    public String getExecutePath() {
-        return executePath;
-    }
-
-    public void setExecutePath(String executePath) {
-        this.executePath = executePath;
-    }
-
-    public void setVarPool(String varPool) {
-        this.varPool = varPool;
-    }
-
-    public String getVarPool() {
-        return varPool;
-    }
-
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public int getStatus() {
-        return status;
-    }
-
-    public void setStatus(int status) {
-        this.status = status;
-    }
-
-    public Date getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(Date endTime) {
-        this.endTime = endTime;
-    }
-
-    public int getProcessId() {
-        return processId;
-    }
-
-    public void setProcessId(int processId) {
-        this.processId = processId;
-    }
-
-    public String getAppIds() {
-        return appIds;
-    }
-
-    public void setAppIds(String appIds) {
-        this.appIds = appIds;
-    }
-
-    /**
-     * package response command
-     *
-     * @return command
-     */
-    public Command convert2Command() {
-        Command command = new Command();
-        command.setType(CommandType.TASK_EXECUTE_RESPONSE);
-        byte[] body = JSONUtils.toJsonByteArray(this);
-        command.setBody(body);
-        return command;
-    }
-
-    @Override
-    public String toString() {
-        return "TaskExecuteResponseCommand{"
-                + "taskInstanceId=" + taskInstanceId
-                + ", processInstanceId=" + processInstanceId
-                + ", status=" + status
-                + ", startTime=" + startTime
-                + ", endTime=" + endTime
-                + ", host=" + host
-                + ", logPath=" + logPath
-                + ", executePath=" + executePath
-                + ", processId=" + processId
-                + ", appIds='" + appIds + '\''
-                + ", varPool=" + varPool
-                + '}';
-    }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public void setProcessInstanceId(int processInstanceId) {
-        this.processInstanceId = processInstanceId;
-    }
-}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java
similarity index 51%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java
index f3df257b9d..12dfa56976 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java
@@ -19,41 +19,82 @@ package org.apache.dolphinscheduler.remote.command;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
-import java.io.Serializable;
+import java.util.Date;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
 
 /**
- * task execute response ack command
- * from master to worker
+ * execute task response command
  */
-public class TaskExecuteResponseAckCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskExecuteResultCommand extends BaseCommand {
+
+    public TaskExecuteResultCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
+        super(messageSenderAddress, messageReceiverAddress, messageSendTime);
+    }
 
+    /**
+     * task instance id
+     */
     private int taskInstanceId;
+
+    /**
+     * process instance id
+     */
+    private int processInstanceId;
+
+    /**
+     * status
+     */
     private int status;
 
-    public TaskExecuteResponseAckCommand() {
-        super();
-    }
+    /**
+     * startTime
+     */
+    private Date startTime;
 
-    public TaskExecuteResponseAckCommand(int status, int taskInstanceId) {
-        this.status = status;
-        this.taskInstanceId = taskInstanceId;
-    }
+    /**
+     * host
+     */
+    private String host;
 
-    public int getStatus() {
-        return status;
-    }
+    /**
+     * logPath
+     */
+    private String logPath;
 
-    public void setStatus(int status) {
-        this.status = status;
-    }
+    /**
+     * executePath
+     */
+    private String executePath;
 
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
 
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
+    /**
+     * end time
+     */
+    private Date endTime;
+
+
+    /**
+     * processId
+     */
+    private int processId;
+
+    /**
+     * appIds
+     */
+    private String appIds;
+
+    /**
+     * varPool string
+     */
+    private String varPool;
 
     /**
      * package response command
@@ -62,17 +103,9 @@ public class TaskExecuteResponseAckCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
+        command.setType(CommandType.TASK_EXECUTE_RESULT);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
-
-    @Override
-    public String toString() {
-        return "TaskExecuteResponseAckCommand{"
-            + "taskInstanceId=" + taskInstanceId
-            + ", status=" + status
-            + '}';
-    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java
similarity index 92%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java
index b0bb666cba..3e17ade0ad 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java
@@ -25,16 +25,16 @@ import java.io.Serializable;
  * task execute running ack command
  * from master to worker
  */
-public class TaskExecuteRunningAckCommand implements Serializable {
+public class TaskExecuteRunningAckMessage implements Serializable {
 
     private int taskInstanceId;
     private int status;
 
-    public TaskExecuteRunningAckCommand() {
+    public TaskExecuteRunningAckMessage() {
         super();
     }
 
-    public TaskExecuteRunningAckCommand(int status, int taskInstanceId) {
+    public TaskExecuteRunningAckMessage(int status, int taskInstanceId) {
         this.status = status;
         this.taskInstanceId = taskInstanceId;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
index 0a2eac29f3..241d37744a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
@@ -19,14 +19,21 @@ package org.apache.dolphinscheduler.remote.command;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
-import java.io.Serializable;
 import java.util.Date;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
 /**
- * task execute running command
- * from worker to master
+ * Task running message, means the task is running in worker.
  */
-public class TaskExecuteRunningCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskExecuteRunningCommand extends BaseCommand {
 
     /**
      * taskInstanceId
@@ -73,76 +80,8 @@ public class TaskExecuteRunningCommand implements Serializable {
      */
     private String appIds;
 
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public int getStatus() {
-        return status;
-    }
-
-    public void setStatus(int status) {
-        this.status = status;
-    }
-
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public void setProcessInstanceId(int processInstanceId) {
-        this.processInstanceId = processInstanceId;
-    }
-
-    public String getLogPath() {
-        return logPath;
-    }
-
-    public void setLogPath(String logPath) {
-        this.logPath = logPath;
-    }
-
-    public String getExecutePath() {
-        return executePath;
-    }
-
-    public void setExecutePath(String executePath) {
-        this.executePath = executePath;
-    }
-
-    public int getProcessId() {
-        return processId;
-    }
-
-    public void setProcessId(int processId) {
-        this.processId = processId;
-    }
-
-    public String getAppIds() {
-        return appIds;
-    }
-
-    public void setAppIds(String appIds) {
-        this.appIds = appIds;
+    public TaskExecuteRunningCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
+        super(messageSenderAddress, messageReceiverAddress, messageSendTime);
     }
 
     /**
@@ -158,18 +97,4 @@ public class TaskExecuteRunningCommand implements Serializable {
         return command;
     }
 
-    @Override
-    public String toString() {
-        return "TaskExecuteRunningCommand{"
-                + "taskInstanceId=" + taskInstanceId
-                + ", processInstanceId='" + processInstanceId + '\''
-                + ", startTime=" + startTime
-                + ", host='" + host + '\''
-                + ", status=" + status
-                + ", logPath='" + logPath + '\''
-                + ", executePath='" + executePath + '\''
-                + ", processId=" + processId + '\''
-                + ", appIds='" + appIds + '\''
-                + '}';
-    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
similarity index 63%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
index 2221a6c09d..aed647bb4c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
@@ -19,41 +19,30 @@ package org.apache.dolphinscheduler.remote.command;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
-import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
 
-/**
- * task recall ack command
- */
-public class TaskRecallAckCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskRejectAckCommand extends BaseCommand {
 
     private int taskInstanceId;
     private int status;
 
-    public TaskRecallAckCommand() {
-        super();
-    }
-
-    public TaskRecallAckCommand(int status, int taskInstanceId) {
+    public TaskRejectAckCommand(int status,
+                                int taskInstanceId,
+                                String messageSenderAddress,
+                                String messageReceiverAddress,
+                                long messageSendTime) {
+        super(messageSenderAddress, messageReceiverAddress, messageSendTime);
         this.status = status;
         this.taskInstanceId = taskInstanceId;
     }
 
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public int getStatus() {
-        return status;
-    }
-
-    public void setStatus(int status) {
-        this.status = status;
-    }
-
     /**
      * package response command
      *
@@ -61,14 +50,10 @@ public class TaskRecallAckCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.TASK_RECALL_ACK);
+        command.setType(CommandType.TASK_REJECT_ACK);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
 
-    @Override
-    public String toString() {
-        return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
-    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
similarity index 60%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
index 0cd81dc3e2..a9ce5c56a7 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
@@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.remote.command;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
-import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
 
 /**
- * kill task recall command
+ * Task reject message, means the task has been rejected by the worker.
  */
-public class TaskRecallCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskRejectCommand extends BaseCommand {
 
     /**
      * taskInstanceId
@@ -41,28 +48,8 @@ public class TaskRecallCommand implements Serializable {
      */
     private int processInstanceId;
 
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public void setProcessInstanceId(int processInstanceId) {
-        this.processInstanceId = processInstanceId;
+    public TaskRejectCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
+        super(messageSenderAddress, messageReceiverAddress, messageSendTime);
     }
 
     /**
@@ -72,18 +59,9 @@ public class TaskRecallCommand implements Serializable {
      */
     public Command convert2Command() {
         Command command = new Command();
-        command.setType(CommandType.TASK_RECALL);
+        command.setType(CommandType.TASK_REJECT);
         byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
-
-    @Override
-    public String toString() {
-        return "TaskRecallCommand{"
-            + "taskInstanceId=" + taskInstanceId
-            + ", host='" + host + '\''
-            + ", processInstanceId=" + processInstanceId
-            + '}';
-    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index dc8e1f0d36..edbaea13ce 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.remote.utils;
 
+import lombok.NonNull;
 import static org.apache.dolphinscheduler.common.Constants.COLON;
 
 import java.io.Serializable;
@@ -95,7 +96,7 @@ public class Host implements Serializable {
      * @param address address
      * @return host
      */
-    public static Host of(String address) {
+    public static Host of(@NonNull String address) {
         String[] parts = splitAddress(address);
         return new Host(parts[0], Integer.parseInt(parts[1]));
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index a8a401cde7..d97a994249 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -25,9 +25,10 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
-import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -79,9 +80,6 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private AlertClientService alertClientService;
 
-    @Autowired
-    private RetryReportTaskStatusThread retryReportTaskStatusThread;
-
     @Autowired
     private WorkerManagerThread workerManagerThread;
 
@@ -99,6 +97,12 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private WorkerRpcServer workerRpcServer;
 
+    @Autowired
+    private WorkerRpcClient workerRpcClient;
+
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
+
     /**
      * worker server startup, not use web service
      *
@@ -112,7 +116,7 @@ public class WorkerServer implements IStoppable {
     @PostConstruct
     public void run() {
         this.workerRpcServer.start();
-
+        this.workerRpcClient.start();
         this.taskPluginManager.installPlugin();
 
         this.workerRegistryClient.registry();
@@ -122,7 +126,7 @@ public class WorkerServer implements IStoppable {
 
         this.workerManagerThread.start();
 
-        this.retryReportTaskStatusThread.start();
+        this.messageRetryRunner.start();
 
         /*
          * registry hooks, which are called before the process exits
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
deleted file mode 100644
index f8e47fc43a..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.cache;
-
-import org.apache.dolphinscheduler.common.enums.TaskEventType;
-import org.apache.dolphinscheduler.remote.command.Command;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Response Cache : cache worker send master result
- */
-public class ResponseCache {
-
-    private static final ResponseCache instance = new ResponseCache();
-
-    private ResponseCache() {
-    }
-
-    public static ResponseCache get() {
-        return instance;
-    }
-
-    private final Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
-    private final Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
-    private final Map<Integer,Command> recallCache = new ConcurrentHashMap<>();
-
-    /**
-     * cache response
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param command command
-     * @param event event ACK/RESULT
-     */
-    public void cache(Integer taskInstanceId, Command command, TaskEventType event) {
-        switch (event) {
-            case RUNNING:
-                runningCache.put(taskInstanceId, command);
-                break;
-            case RESULT:
-                responseCache.put(taskInstanceId, command);
-                break;
-            case WORKER_REJECT:
-                recallCache.put(taskInstanceId, command);
-                break;
-            default:
-                throw new IllegalArgumentException("invalid event type : " + event);
-        }
-    }
-
-    /**
-     * recall response cache
-     *
-     * @param taskInstanceId taskInstanceId
-     */
-    public void removeRecallCache(Integer taskInstanceId) {
-        recallCache.remove(taskInstanceId);
-    }
-
-    public Map<Integer, Command> getRecallCache() {
-        return recallCache;
-    }
-
-    /**
-     * remove running cache
-     *
-     * @param taskInstanceId taskInstanceId
-     */
-    public void removeRunningCache(Integer taskInstanceId) {
-        runningCache.remove(taskInstanceId);
-    }
-
-    /**
-     * remove response cache
-     *
-     * @param taskInstanceId taskInstanceId
-     */
-    public void removeResponseCache(Integer taskInstanceId) {
-        responseCache.remove(taskInstanceId);
-    }
-
-    /**
-     * get running cache
-     *
-     * @return getAckCache
-     */
-    public Map<Integer, Command> getRunningCache() {
-        return runningCache;
-    }
-
-    /**
-     * getResponseCache
-     *
-     * @return getResponseCache
-     */
-    public Map<Integer, Command> getResponseCache() {
-        return responseCache;
-    }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index a641b28e1b..aab162d6d4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.worker.config;
 
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+
 import java.time.Duration;
 import java.util.Set;
 
@@ -46,6 +48,10 @@ public class WorkerConfig implements Validator {
     private Set<String> groups = Sets.newHashSet("default");
     private String alertListenHost = "localhost";
     private int alertListenPort = 50052;
+    /**
+     * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
+     */
+    private String workerAddress;
 
     @Override
     public boolean supports(Class<?> clazz) {
@@ -64,6 +70,6 @@ public class WorkerConfig implements Validator {
         if (workerConfig.getMaxCpuLoadAvg() <= 0) {
             workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
-
+        workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
new file mode 100644
index 0000000000..18dceb069b
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import lombok.NonNull;
+
+@Component
+public class MessageRetryRunner extends BaseDaemonThread {
+
+    private final Logger logger = LoggerFactory.getLogger(MessageRetryRunner.class);
+
+    protected MessageRetryRunner() {
+        super("WorkerMessageRetryRunnerThread");
+    }
+
+    private static long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis();
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    private Map<CommandType, MessageSender<BaseCommand>> messageSenderMap = new HashMap<>();
+
+    private Map<Integer, Map<CommandType, BaseCommand>> needToRetryMessages = new ConcurrentHashMap<>();
+
+    @PostConstruct
+    public void init() {
+        Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
+        messageSenders.values().forEach(messageSender -> {
+            messageSenderMap.put(messageSender.getMessageType(), messageSender);
+            logger.info("Injected message sender: {}", messageSender.getClass().getName());
+        });
+    }
+
+    @Override
+    public synchronized void start() {
+        logger.info("Message retry runner staring");
+        super.start();
+        logger.info("Message retry runner started");
+    }
+
+    public void addRetryMessage(int taskInstanceId, @NonNull CommandType messageType, BaseCommand baseCommand) {
+        needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new ConcurrentHashMap<>()).put(messageType,
+                                                                                                baseCommand);
+    }
+
+    public void removeRetryMessage(int taskInstanceId, @NonNull CommandType messageType) {
+        Map<CommandType, BaseCommand> retryMessages = needToRetryMessages.get(taskInstanceId);
+        if (retryMessages != null) {
+            retryMessages.remove(messageType);
+        }
+    }
+
+    public void removeRetryMessages(int taskInstanceId) {
+        needToRetryMessages.remove(taskInstanceId);
+    }
+
+    public void updateMessageHost(int taskInstanceId, String messageReceiverHost) {
+        Map<CommandType, BaseCommand> needToRetryMessages = this.needToRetryMessages.get(taskInstanceId);
+        if (needToRetryMessages != null) {
+            needToRetryMessages.values().forEach(baseMessage -> {
+                baseMessage.setMessageReceiverAddress(messageReceiverHost);
+            });
+        }
+    }
+
+    public void run() {
+        while (Stopper.isRunning()) {
+            try {
+                if (needToRetryMessages.isEmpty()) {
+                    Thread.sleep(MESSAGE_RETRY_WINDOW);
+                }
+
+                long now = System.currentTimeMillis();
+                for (Map.Entry<Integer, Map<CommandType, BaseCommand>> taskEntry : needToRetryMessages.entrySet()) {
+                    Integer taskInstanceId = taskEntry.getKey();
+                    LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+                    try {
+                        for (Map.Entry<CommandType, BaseCommand> messageEntry : taskEntry.getValue().entrySet()) {
+                            CommandType messageType = messageEntry.getKey();
+                            BaseCommand message = messageEntry.getValue();
+                            if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) {
+                                logger.info("Begin retry send message to master, message: {}", message);
+                                message.setMessageSendTime(now);
+                                messageSenderMap.get(messageType).sendMessage(message);
+                                logger.info("Success send message to master, message: {}", message);
+                            }
+                        }
+                    } catch (Exception e) {
+                        logger.warn("Retry send message to master error", e);
+                    } finally {
+                        LoggerUtils.removeTaskInstanceIdMDC();
+                    }
+                }
+                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+            } catch (InterruptedException instance) {
+                logger.warn("The message retry thread is interrupted, will break this loop", instance);
+                Thread.currentThread().interrupt();
+                break;
+            } catch (Exception ex) {
+                logger.error("Retry send message failed, get an known exception.", ex);
+            }
+        }
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java
similarity index 53%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java
index 000e3f4e02..519c5b2b3e 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java
@@ -15,32 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.remote.command;
+package org.apache.dolphinscheduler.server.worker.message;
 
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import java.io.Serializable;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class TaskExecuteRequestCommand implements Serializable {
-
-    private static final long serialVersionUID = -1L;
-
-    private TaskExecutionContext taskExecutionContext;
-
-    public Command convert2Command() {
-        Command command = new Command();
-        command.setType(CommandType.TASK_EXECUTE_REQUEST);
-        byte[] body = JSONUtils.toJsonByteArray(this);
-        command.setBody(body);
-        return command;
-    }
-
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+
+public interface MessageSender<T extends BaseCommand> {
+
+    /**
+     * Send the message
+     *
+     * @throws RemotingException Cannot connect to the target host.
+     */
+    void sendMessage(T message) throws RemotingException;
+
+    /**
+     * Build the message from task context and message received address.
+     */
+    T buildMessage(TaskExecutionContext taskExecutionContext, String messageReceiverAddress);
+
+    /**
+     * The message type can be sent by this sender.
+     */
+    CommandType getMessageType();
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java
new file mode 100644
index 0000000000..a7d05a4791
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskExecuteResultMessageSender implements MessageSender<TaskExecuteResultCommand> {
+
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    @Autowired
+    private WorkerRpcClient workerRpcClient;
+
+    @Override
+    public void sendMessage(TaskExecuteResultCommand message) throws RemotingException {
+        workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
+    }
+
+    public TaskExecuteResultCommand buildMessage(TaskExecutionContext taskExecutionContext,
+                                                 String messageReceiverAddress) {
+        TaskExecuteResultCommand taskExecuteResultMessage
+            = new TaskExecuteResultCommand(workerConfig.getWorkerAddress(),
+                                           messageReceiverAddress,
+                                           System.currentTimeMillis());
+        taskExecuteResultMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+        taskExecuteResultMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        taskExecuteResultMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+        taskExecuteResultMessage.setLogPath(taskExecutionContext.getLogPath());
+        taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
+        taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds());
+        taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId());
+        taskExecuteResultMessage.setHost(taskExecutionContext.getHost());
+        taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime());
+        taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime());
+        taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool());
+        taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
+        return taskExecuteResultMessage;
+    }
+
+    @Override
+    public CommandType getMessageType() {
+        return CommandType.TASK_EXECUTE_RESULT;
+    }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java
new file mode 100644
index 0000000000..7891f4be70
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import lombok.NonNull;
+
+@Component
+public class TaskExecuteRunningMessageSender implements MessageSender<TaskExecuteRunningCommand> {
+
+    @Autowired
+    private WorkerRpcClient workerRpcClient;
+
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    @Override
+    public void sendMessage(TaskExecuteRunningCommand message) throws RemotingException {
+        workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
+    }
+
+    public TaskExecuteRunningCommand buildMessage(@NonNull TaskExecutionContext taskExecutionContext,
+                                                  @NonNull String messageReceiverAddress) {
+        TaskExecuteRunningCommand taskExecuteRunningMessage
+            = new TaskExecuteRunningCommand(workerConfig.getWorkerAddress(),
+                                            messageReceiverAddress,
+                                            System.currentTimeMillis());
+        taskExecuteRunningMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+        taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+        taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath());
+        taskExecuteRunningMessage.setHost(taskExecutionContext.getHost());
+        taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime());
+        taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath());
+        return taskExecuteRunningMessage;
+    }
+
+    @Override
+    public CommandType getMessageType() {
+        return CommandType.TASK_EXECUTE_RUNNING;
+    }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
new file mode 100644
index 0000000000..d50c5d8997
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskRejectMessageSender implements MessageSender<TaskRejectCommand> {
+
+    @Autowired
+    private WorkerRpcClient workerRpcClient;
+
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    @Override
+    public void sendMessage(TaskRejectCommand message) throws RemotingException {
+        workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
+    }
+
+    public TaskRejectCommand buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) {
+        TaskRejectCommand taskRejectMessage = new TaskRejectCommand(workerConfig.getWorkerAddress(),
+                                                                    masterAddress,
+                                                                    System.currentTimeMillis());
+        taskRejectMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        taskRejectMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+        taskRejectMessage.setHost(taskExecutionContext.getHost());
+        return taskRejectMessage;
+    }
+
+    @Override
+    public CommandType getMessageType() {
+        return CommandType.TASK_REJECT;
+    }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
index d1ceca6c9e..d36c80012d 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
@@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,22 +42,19 @@ public class HostUpdateProcessor implements NettyRequestProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(HostUpdateProcessor.class);
 
-    /**
-     * task callback service
-     */
     @Autowired
-    private TaskCallbackService taskCallbackService;
+    private MessageRetryRunner messageRetryRunner;
 
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(),
+                                    String.format("invalid command type : %s", command.getType()));
         HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class);
         if (updateCommand == null) {
             logger.error("host update command is null");
             return;
         }
         logger.info("received host update command : {}", updateCommand);
-        taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
-
+        messageRetryRunner.updateMessageHost(updateCommand.getTaskInstanceId(), updateCommand.getProcessHost());
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
deleted file mode 100644
index 21616a1f60..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.processor;
-
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
-import org.apache.dolphinscheduler.common.enums.TaskEventType;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
-import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
-import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
-
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-
-
-/**
- * task callback service
- */
-@Service
-public class TaskCallbackService {
-
-    private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
-    private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200};
-
-    @Autowired
-    private TaskExecuteRunningAckProcessor taskExecuteRunningProcessor;
-
-    @Autowired
-    private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
-
-    /**
-     * remote channels
-     */
-    private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
-
-    /**
-     * netty remoting client
-     */
-    private final NettyRemotingClient nettyRemotingClient;
-
-    public TaskCallbackService() {
-        final NettyClientConfig clientConfig = new NettyClientConfig();
-        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
-        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
-        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
-    }
-
-    /**
-     * add callback channel
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param channel channel
-     */
-    public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
-        REMOTE_CHANNELS.put(taskInstanceId, channel);
-    }
-
-    /**
-     * change remote channel
-     */
-    public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
-        REMOTE_CHANNELS.put(taskInstanceId, channel);
-    }
-
-    /**
-     * get callback channel
-     *
-     * @param taskInstanceId taskInstanceId
-     * @return callback channel
-     */
-    private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
-        Channel newChannel;
-        NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
-        if (nettyRemoteChannel != null) {
-            if (nettyRemoteChannel.isActive()) {
-                return Optional.of(nettyRemoteChannel);
-            }
-            newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
-            if (newChannel != null) {
-                return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId));
-            }
-        }
-        return Optional.empty();
-    }
-
-    public long pause(int ntries) {
-        return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
-    }
-
-    private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId) {
-        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
-        addRemoteChannel(taskInstanceId, remoteChannel);
-        return remoteChannel;
-    }
-
-    private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId) {
-        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
-        addRemoteChannel(taskInstanceId, remoteChannel);
-        return remoteChannel;
-    }
-
-    /**
-     * remove callback channels
-     *
-     * @param taskInstanceId taskInstanceId
-     */
-    public static void remove(int taskInstanceId) {
-        REMOTE_CHANNELS.remove(taskInstanceId);
-    }
-
-    /**
-     * send result
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param command command
-     */
-    public void send(int taskInstanceId, Command command) {
-        Optional<NettyRemoteChannel> nettyRemoteChannel = getRemoteChannel(taskInstanceId);
-        if (nettyRemoteChannel.isPresent()) {
-            nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) {
-                    if (!future.isSuccess()) {
-                        logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command);
-                    }
-                }
-            });
-        } else {
-            logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command);
-        }
-    }
-
-    /**
-     * build task execute running command
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return TaskExecuteAckCommand
-     */
-    private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
-        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        command.setLogPath(taskExecutionContext.getLogPath());
-        command.setHost(taskExecutionContext.getHost());
-        command.setStartTime(taskExecutionContext.getStartTime());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        return command;
-    }
-
-    /**
-     * build task execute response command
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return TaskExecuteResponseCommand
-     */
-    private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
-        command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        command.setLogPath(taskExecutionContext.getLogPath());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        command.setAppIds(taskExecutionContext.getAppIds());
-        command.setProcessId(taskExecutionContext.getProcessId());
-        command.setHost(taskExecutionContext.getHost());
-        command.setStartTime(taskExecutionContext.getStartTime());
-        command.setEndTime(taskExecutionContext.getEndTime());
-        command.setVarPool(taskExecutionContext.getVarPool());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        return command;
-    }
-
-    /**
-     * build TaskKillResponseCommand
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return build TaskKillResponseCommand
-     */
-    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
-        taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
-        taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        taskKillResponseCommand.setHost(taskExecutionContext.getHost());
-        taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
-        return taskKillResponseCommand;
-    }
-
-    private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) {
-        TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
-        taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        taskRecallCommand.setHost(taskExecutionContext.getHost());
-        return taskRecallCommand;
-    }
-
-    /**
-     * send task execute running command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
-        // add response cache
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RUNNING);
-        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
-    }
-
-    /**
-     * send task execute delay command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
-        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
-    }
-
-    /**
-     * send task execute response command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext);
-        // add response cache
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RESULT);
-        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
-    }
-
-    public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext);
-        send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
-    }
-
-    /**
-     * send task execute response command
-     */
-    public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
-        TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext);
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT);
-        send(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command());
-    }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
similarity index 69%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
index c14a2e891e..a9ce868c2d 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
@@ -24,19 +24,18 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
@@ -58,16 +57,13 @@ import io.micrometer.core.annotation.Timed;
 import io.netty.channel.Channel;
 
 /**
- * worker request processor
+ * Used to handle {@link CommandType#TASK_DISPATCH_REQUEST}
  */
 @Component
-public class TaskExecuteProcessor implements NettyRequestProcessor {
+public class TaskDispatchProcessor implements NettyRequestProcessor {
 
-    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
+    private static final Logger logger = LoggerFactory.getLogger(TaskDispatchProcessor.class);
 
-    /**
-     * worker config
-     */
     @Autowired
     private WorkerConfig workerConfig;
 
@@ -75,7 +71,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
      * task callback service
      */
     @Autowired
-    private TaskCallbackService taskCallbackService;
+    private WorkerMessageSender workerMessageSender;
 
     /**
      * alert client service
@@ -99,26 +95,27 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
-            String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
+                                    String.format("invalid command type : %s", command.getType()));
 
-        TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
-            command.getBody(), TaskExecuteRequestCommand.class);
+        TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);
 
-        if (taskRequestCommand == null) {
-            logger.error("task execute request command is null");
+        if (taskDispatchCommand == null) {
+            logger.error("task execute request command content is null");
             return;
         }
-        logger.info("task execute request command : {}", taskRequestCommand);
+        final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
+        logger.info("task execute request message: {}", taskDispatchCommand);
 
-        TaskExecutionContext taskExecutionContext = taskRequestCommand.getTaskExecutionContext();
+        TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
 
         if (taskExecutionContext == null) {
             logger.error("task execution context is null");
             return;
         }
         try {
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+                                                        taskExecutionContext.getTaskInstanceId());
 
             TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
 
@@ -127,7 +124,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
             // todo custom logger
 
-            taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
+            taskExecutionContext.setHost(workerConfig.getWorkerAddress());
             taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
 
             if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
@@ -148,11 +145,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
                 // check if the OS user exists
                 if (!osUserExistFlag) {
                     logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
-                        taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
+                                 taskExecutionContext.getTenantCode(),
+                                 taskExecutionContext.getTaskInstanceId());
                     TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                     taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
                     taskExecutionContext.setEndTime(new Date());
-                    taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+                                                             masterAddress,
+                                                             CommandType.TASK_EXECUTE_RESULT);
                     return;
                 }
 
@@ -164,33 +164,43 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
                 try {
                     FileUtils.createWorkDirIfAbsent(execLocalPath);
                 } catch (Throwable ex) {
-                    logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
-                    logger.error("create executeLocalPath fail", ex);
+                    logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
+                                 execLocalPath,
+                                 taskExecutionContext.getTaskInstanceId(),
+                                 ex);
                     TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                     taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
-                    taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+                                                             masterAddress,
+                                                             CommandType.TASK_EXECUTE_RESULT);
                     return;
                 }
             }
 
-            taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
-
             // delay task process
-            long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
+            long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+                                                      taskExecutionContext.getDelayTime() * 60L);
             if (remainTime > 0) {
-                logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
+                logger.info("delay the execution of task instance {}, delay time: {} s",
+                            taskExecutionContext.getTaskInstanceId(),
+                            remainTime);
                 taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
                 taskExecutionContext.setStartTime(null);
-                taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
+                workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
             }
 
             // submit task to manager
-            boolean offer = workerManager.offer(
-                new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, storageOperate));
+            boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
+                                                                      masterAddress,
+                                                                      workerMessageSender,
+                                                                      alertClientService,
+                                                                      taskPluginManager,
+                                                                      storageOperate));
             if (!offer) {
                 logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
-                    workerManager.getWaitSubmitQueueSize(), taskExecutionContext.getTaskInstanceId());
-                taskCallbackService.sendRecallCommand(taskExecutionContext);
+                            workerManager.getWaitSubmitQueueSize(),
+                            taskExecutionContext.getTaskInstanceId());
+                workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT);
             }
         } finally {
             LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
deleted file mode 100644
index b62770984a..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.processor;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import com.google.common.base.Preconditions;
-
-import io.netty.channel.Channel;
-
-/**
- * task execute running ack, from master to worker
- */
-@Component
-public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
-
-    private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseAckProcessor.class);
-
-    @Override
-    public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE_ACK == command.getType(),
-                String.format("invalid command type : %s", command.getType()));
-
-        TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = JSONUtils.parseObject(
-                command.getBody(), TaskExecuteResponseAckCommand.class);
-
-        if (taskExecuteResponseAckCommand == null) {
-            logger.error("task execute response ack command is null");
-            return;
-        }
-        logger.info("task execute response ack command : {}", taskExecuteResponseAckCommand);
-
-        if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
-            ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
-            TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
-            logger.debug("remove REMOTE_CHANNELS, task instance id:{}",
-                taskExecuteResponseAckCommand.getTaskInstanceId());
-        } else if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.FAILURE.getCode()) {
-            // master handle worker response error, will still retry
-        } else {
-            throw new IllegalArgumentException("Invalid task execute response ack status: "
-                + taskExecuteResponseAckCommand.getStatus());
-        }
-    }
-
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
new file mode 100644
index 0000000000..c60c283c7f
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+/**
+ * task execute running ack, from master to worker
+ */
+@Component
+public class TaskExecuteResultAckProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(TaskExecuteResultAckProcessor.class);
+
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT_ACK == command.getType(),
+                                    String.format("invalid command type : %s", command.getType()));
+
+        TaskExecuteAckCommand taskExecuteAckMessage = JSONUtils.parseObject(command.getBody(),
+                                                                            TaskExecuteAckCommand.class);
+
+        if (taskExecuteAckMessage == null) {
+            logger.error("task execute response ack command is null");
+            return;
+        }
+        logger.info("task execute response ack command : {}", taskExecuteAckMessage);
+
+        try {
+            LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
+            if (taskExecuteAckMessage.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+                messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(),
+                                                      CommandType.TASK_EXECUTE_RESULT);
+                logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteAckMessage.getTaskInstanceId());
+            } else if (taskExecuteAckMessage.getStatus() == ExecutionStatus.FAILURE.getCode()) {
+                // master handle worker response error, will still retry
+                logger.error("Receive task execute result ack message, the message status is not success, message: {}",
+                             taskExecuteAckMessage);
+            } else {
+                throw new IllegalArgumentException("Invalid task execute response ack status: "
+                                                       + taskExecuteAckMessage.getStatus());
+            }
+        } finally {
+            LoggerUtils.removeTaskInstanceIdMDC();
+
+        }
+    }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
index 421e92a530..b85db0cb64 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
@@ -22,12 +22,13 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
@@ -42,13 +43,16 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningAckProcessor.class);
 
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
+
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
-            String.format("invalid command type : %s", command.getType()));
+                                    String.format("invalid command type : %s", command.getType()));
 
-        TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
-            command.getBody(), TaskExecuteRunningAckCommand.class);
+        TaskExecuteRunningAckMessage runningAckCommand = JSONUtils.parseObject(command.getBody(),
+                                                                               TaskExecuteRunningAckMessage.class);
         if (runningAckCommand == null) {
             logger.error("task execute running ack command is null");
             return;
@@ -58,7 +62,8 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
             logger.info("task execute running ack command : {}", runningAckCommand);
 
             if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
-                ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
+                messageRetryRunner.removeRetryMessage(runningAckCommand.getTaskInstanceId(),
+                                                      CommandType.TASK_EXECUTE_RUNNING);
             }
         } finally {
             LoggerUtils.removeTaskInstanceIdMDC();
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index ecbbe78128..7e18fbbbef 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -28,15 +28,17 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -47,7 +49,10 @@ import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 
 /**
  * task kill processor
@@ -57,18 +62,15 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
 
-    /**
-     * task callback service
-     */
-    @Autowired
-    private TaskCallbackService taskCallbackService;
-
     /**
      * task execute manager
      */
     @Autowired
     private WorkerManagerThread workerManager;
 
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
+
     /**
      * task kill process
      *
@@ -77,7 +79,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(),
+                                    String.format("invalid command type : %s", command.getType()));
         TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
         if (killCommand == null) {
             logger.error("task kill request command is null");
@@ -86,7 +89,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         logger.info("task kill command : {}", killCommand);
 
         int taskInstanceId = killCommand.getTaskInstanceId();
-        TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+        TaskExecutionContext taskExecutionContext
+            = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
         if (taskExecutionContext == null) {
             logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
             return;
@@ -96,25 +100,43 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         if (processId == 0) {
             this.cancelApplication(taskInstanceId);
             workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
+            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL);
             TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+            sendTaskKillResponseCommand(channel, taskExecutionContext);
             logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
             return;
         }
 
         Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
 
-        taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
-                new NettyRemoteChannel(channel, command.getOpaque()));
-
-        taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
+        taskExecutionContext.setCurrentExecutionStatus(
+            result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
         taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
+        sendTaskKillResponseCommand(channel, taskExecutionContext);
 
-        taskCallbackService.sendTaskKillResponseCommand(taskExecutionContext);
         TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
 
         logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
     }
 
+    private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
+        TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
+        taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+        taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+        taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        taskKillResponseCommand.setHost(taskExecutionContext.getHost());
+        taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+        channel.writeAndFlush(taskKillResponseCommand.convert2Command()).addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    logger.error("Submit kill response to master error, kill command: {}", taskKillResponseCommand);
+                }
+            }
+        });
+    }
+
     /**
      * do kill
      *
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
similarity index 50%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
index 769024e0aa..850630efe4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
@@ -18,15 +18,17 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
@@ -34,25 +36,35 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 
 @Component
-public class TaskRecallAckProcessor implements NettyRequestProcessor {
+public class TaskRejectAckProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = LoggerFactory.getLogger(TaskRecallAckProcessor.class);
+    private final Logger logger = LoggerFactory.getLogger(TaskRejectAckProcessor.class);
+
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
 
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.TASK_RECALL_ACK == command.getType(),
-            String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.TASK_REJECT_ACK == command.getType(),
+                                    String.format("invalid command type : %s", command.getType()));
 
-        TaskRecallAckCommand taskRecallAckCommand = JSONUtils.parseObject(command.getBody(), TaskRecallAckCommand.class);
-        if (taskRecallAckCommand == null) {
+        TaskRejectAckCommand taskRejectAckMessage = JSONUtils.parseObject(command.getBody(),
+                                                                          TaskRejectAckCommand.class);
+        if (taskRejectAckMessage == null) {
             return;
         }
-
-        if (taskRecallAckCommand.getStatus() ==  ExecutionStatus.SUCCESS.getCode()) {
-            ResponseCache.get().removeRecallCache(taskRecallAckCommand.getTaskInstanceId());
-            logger.debug("removeRecallCache: task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
-            TaskCallbackService.remove(taskRecallAckCommand.getTaskInstanceId());
-            logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
+        try {
+            LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
+            if (taskRejectAckMessage.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+                messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(),
+                                                      CommandType.TASK_REJECT);
+                logger.debug("removeRecallCache: task instance id:{}", taskRejectAckMessage.getTaskInstanceId());
+            } else {
+                logger.error("Receive task reject ack message, the message status is not success, message: {}",
+                             taskRejectAckMessage);
+            }
+        } finally {
+            LoggerUtils.removeTaskInstanceIdMDC();
         }
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
new file mode 100644
index 0000000000..5a2752ca34
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.rpc;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.message.MessageSender;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import lombok.NonNull;
+
+@Component
+public class WorkerMessageSender {
+
+    private final Logger logger = LoggerFactory.getLogger(WorkerMessageSender.class);
+
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
+
+    @PostConstruct
+    public void init() {
+        Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
+        messageSenders.values().forEach(messageSender -> messageSenderMap.put(messageSender.getMessageType(),
+                                                                              messageSender));
+    }
+
+    // todo: use message rather than context
+    public void sendMessageWithRetry(@NonNull TaskExecutionContext taskExecutionContext,
+                                     @NonNull String messageReceiverAddress,
+                                     @NonNull CommandType messageType) {
+        MessageSender messageSender = messageSenderMap.get(messageType);
+        if (messageSender == null) {
+            throw new IllegalArgumentException("The messageType is invalidated, messageType: " + messageType);
+        }
+        BaseCommand baseCommand = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress);
+        try {
+            messageRetryRunner.addRetryMessage(taskExecutionContext.getTaskInstanceId(), messageType, baseCommand);
+            messageSender.sendMessage(baseCommand);
+        } catch (RemotingException e) {
+            logger.error("Send message error, messageType: {}, message: {}", messageType, baseCommand);
+        }
+    }
+
+    public void sendMessage(@NonNull TaskExecutionContext taskExecutionContext,
+                            @NonNull String messageReceiverAddress,
+                            @NonNull CommandType messageType) {
+        MessageSender messageSender = messageSenderMap.get(messageType);
+        if (messageSender == null) {
+            throw new IllegalArgumentException("The messageType is invalidated, messageType: " + messageType);
+        }
+        BaseCommand baseCommand = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress);
+        try {
+            messageSender.sendMessage(baseCommand);
+        } catch (RemotingException e) {
+            logger.error("Send message error, messageType: {}, message: {}", messageType, baseCommand);
+        }
+    }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
new file mode 100644
index 0000000000..4be20873f9
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.rpc;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * This rpc client is only used to send message, will not receive message, all response message should send to {@link WorkerRpcServer}.
+ */
+@Component
+public class WorkerRpcClient implements AutoCloseable {
+
+    private final Logger logger = LoggerFactory.getLogger(WorkerRpcClient.class);
+
+    @Autowired
+    private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
+
+    @Autowired
+    private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
+
+    @Autowired
+    private TaskRejectAckProcessor taskRejectAckProcessor;
+
+    private NettyRemotingClient nettyRemotingClient;
+
+    public void start() {
+        logger.info("Worker rpc client starting");
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        this.nettyRemotingClient = new NettyRemotingClient(nettyClientConfig);
+        // we only use the client to handle the ack message, we can optimize this, send ack to the nettyServer.
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
+                                                   taskExecuteRunningAckProcessor);
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
+        logger.info("Worker rpc client started");
+    }
+
+    public void send(Host host, Command command) throws RemotingException {
+        nettyRemotingClient.send(host, command);
+    }
+
+    public void close() {
+        logger.info("Worker rpc client closing");
+        nettyRemotingClient.close();
+        logger.info("Worker rpc client closed");
+    }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index be2ccc0ab4..e599046c57 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -23,11 +23,11 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
 
 import java.io.Closeable;
 
@@ -42,19 +42,19 @@ public class WorkerRpcServer implements Closeable {
     private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRpcServer.class);
 
     @Autowired
-    private TaskExecuteProcessor taskExecuteProcessor;
+    private TaskDispatchProcessor taskDispatchProcessor;
 
     @Autowired
     private TaskKillProcessor taskKillProcessor;
 
     @Autowired
-    private TaskRecallAckProcessor taskRecallAckProcessor;
+    private TaskRejectAckProcessor taskRejectAckProcessor;
 
     @Autowired
     private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
 
     @Autowired
-    private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
+    private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
 
     @Autowired
     private HostUpdateProcessor hostUpdateProcessor;
@@ -72,12 +72,12 @@ public class WorkerRpcServer implements Closeable {
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(workerConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor);
         // logger server
         this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
deleted file mode 100644
index 73b8d84cf7..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
-import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Retry Report Task Status Thread
- */
-@Component
-public class RetryReportTaskStatusThread extends BaseDaemonThread {
-
-    private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
-
-    /**
-     * every 5 minutes
-     */
-    private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L;
-
-    @Autowired
-    private TaskCallbackService taskCallbackService;
-
-    protected RetryReportTaskStatusThread() {
-        super("RetryReportTaskStatusThread");
-    }
-
-    @Override
-    public synchronized void start() {
-        logger.info("Retry report task status thread starting");
-        super.start();
-        logger.info("Retry report task status thread started");
-    }
-
-    /**
-     * retry ack/response
-     */
-    @Override
-    public void run() {
-        final ResponseCache instance = ResponseCache.get();
-
-        while (Stopper.isRunning()) {
-
-            // sleep 5 minutes
-            ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
-
-            try {
-                // todo: Only retry the send failed command
-                retryRunningCommand(instance);
-                retryResponseCommand(instance);
-                retryRecallCommand(instance);
-            } catch (Exception e) {
-                logger.warn("Retry report task status error", e);
-            }
-        }
-    }
-
-    private void retryRunningCommand(ResponseCache instance) {
-        if (!instance.getRunningCache().isEmpty()) {
-            Map<Integer, Command> runningCache = instance.getRunningCache();
-            logger.info("Send task running retry command starting, waiting to retry size: {}", runningCache.size());
-            for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
-                Integer taskInstanceId = entry.getKey();
-                Command runningCommand = entry.getValue();
-                try {
-                    taskCallbackService.send(taskInstanceId, runningCommand);
-                } catch (Exception ex) {
-                    logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
-                }
-            }
-            logger.info("Send task running retry command finished, waiting to retry size: {}", runningCache.size());
-        }
-    }
-
-    private void retryResponseCommand(ResponseCache instance) {
-        Map<Integer, Command> responseCache = instance.getResponseCache();
-        if (!responseCache.isEmpty()) {
-            logger.info("Send task response retry command starting, waiting to retry size: {}", responseCache.size());
-            for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
-                Integer taskInstanceId = entry.getKey();
-                Command responseCommand = entry.getValue();
-                try {
-                    taskCallbackService.send(taskInstanceId, responseCommand);
-                } catch (Exception ex) {
-                    logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
-                }
-            }
-            logger.info("Send task response retry command finished, waiting to retry size: {}", responseCache.size());
-        }
-    }
-
-    private void retryRecallCommand(ResponseCache instance) {
-        Map<Integer, Command> recallCache = instance.getRecallCache();
-        if (!recallCache.isEmpty()) {
-            logger.info("Send task recall retry command starting, waiting to retry size: {}", recallCache.size());
-            for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
-                Integer taskInstanceId = entry.getKey();
-                Command responseCommand = entry.getValue();
-                try {
-                    taskCallbackService.send(taskInstanceId, responseCommand);
-                } catch (Exception ex) {
-                    logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
-                }
-            }
-            logger.info("Send task recall retry command finished, waiting to retry size: {}", recallCache.size());
-        }
-    }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 7e03f83844..9f2ad933ad 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -33,11 +33,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
+import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@@ -53,18 +53,18 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
 
+import lombok.NonNull;
+
 /**
  * task scheduler thread
  */
@@ -78,9 +78,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
     /**
      * task instance
      */
-    private TaskExecutionContext taskExecutionContext;
+    private final TaskExecutionContext taskExecutionContext;
+
+    private final String masterAddress;
 
-    private StorageOperate storageOperate;
+    private final StorageOperate storageOperate;
 
     /**
      * abstract task
@@ -90,12 +92,12 @@ public class TaskExecuteThread implements Runnable, Delayed {
     /**
      * task callback service
      */
-    private TaskCallbackService taskCallbackService;
+    private final WorkerMessageSender workerMessageSender;
 
     /**
      * alert client server
      */
-    private AlertClientService alertClientService;
+    private final AlertClientService alertClientService;
 
     private TaskPluginManager taskPluginManager;
 
@@ -103,25 +105,29 @@ public class TaskExecuteThread implements Runnable, Delayed {
      * constructor
      *
      * @param taskExecutionContext taskExecutionContext
-     * @param taskCallbackService  taskCallbackService
+     * @param workerMessageSender  used for worker send message to master
      */
-    public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
-                             TaskCallbackService taskCallbackService,
-                             AlertClientService alertClientService,
+    public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
+                             @NonNull String masterAddress,
+                             @NonNull WorkerMessageSender workerMessageSender,
+                             @NonNull AlertClientService alertClientService,
                              StorageOperate storageOperate) {
         this.taskExecutionContext = taskExecutionContext;
-        this.taskCallbackService = taskCallbackService;
+        this.masterAddress = masterAddress;
+        this.workerMessageSender = workerMessageSender;
         this.alertClientService = alertClientService;
         this.storageOperate = storageOperate;
     }
 
-    public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
-                             TaskCallbackService taskCallbackService,
-                             AlertClientService alertClientService,
-                             TaskPluginManager taskPluginManager,
+    public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
+                             @NonNull String masterAddress,
+                             @NonNull WorkerMessageSender workerMessageSender,
+                             @NonNull AlertClientService alertClientService,
+                             @NonNull TaskPluginManager taskPluginManager,
                              StorageOperate storageOperate) {
         this.taskExecutionContext = taskExecutionContext;
-        this.taskCallbackService = taskCallbackService;
+        this.masterAddress = masterAddress;
+        this.workerMessageSender = workerMessageSender;
         this.alertClientService = alertClientService;
         this.taskPluginManager = taskPluginManager;
         this.storageOperate = storageOperate;
@@ -129,18 +135,26 @@ public class TaskExecuteThread implements Runnable, Delayed {
 
     @Override
     public void run() {
-        if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
-            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
-            taskExecutionContext.setStartTime(new Date());
-            taskExecutionContext.setEndTime(new Date());
-            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
-            logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
-                taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
-            return;
+        try {
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+                                                        taskExecutionContext.getTaskInstanceId());
+            if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
+                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
+                taskExecutionContext.setStartTime(new Date());
+                taskExecutionContext.setEndTime(new Date());
+                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+                workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+                                                         masterAddress,
+                                                         CommandType.TASK_EXECUTE_RESULT);
+                logger.info("Task dry run success");
+                return;
+            }
+        } finally {
+            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
         try {
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+                                                        taskExecutionContext.getTaskInstanceId());
             logger.info("script path : {}", taskExecutionContext.getExecutePath());
             if (taskExecutionContext.getStartTime() == null) {
                 taskExecutionContext.setStartTime(new Date());
@@ -149,10 +163,13 @@ public class TaskExecuteThread implements Runnable, Delayed {
 
             // callback task execute running
             taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
-            taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
+            workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+                                                     masterAddress,
+                                                     CommandType.TASK_EXECUTE_RUNNING);
 
             // copy hdfs/minio file to local
-            List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
+            List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
+                                                                     taskExecutionContext.getResources());
             if (!fileDownloads.isEmpty()) {
                 downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
             }
@@ -160,8 +177,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
             taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
 
             taskExecutionContext.setTaskAppId(String.format("%s_%s",
-                    taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskInstanceId()));
+                                                            taskExecutionContext.getProcessInstanceId(),
+                                                            taskExecutionContext.getTaskInstanceId()));
 
             TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
             if (null == taskChannel) {
@@ -208,7 +225,9 @@ public class TaskExecuteThread implements Runnable, Delayed {
             taskExecutionContext.setAppIds(this.task.getAppIds());
         } finally {
             TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+            workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+                                                     masterAddress,
+                                                     CommandType.TASK_EXECUTE_RESULT);
             clearTaskExecPath();
             LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
index 67abf523ed..98de315345 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -68,8 +68,10 @@ public class WorkerExecService {
 
             @Override
             public void onFailure(Throwable throwable) {
-                logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
-                    , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
+                logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
+                             taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
+                             taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
+                             throwable);
                 taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
             }
         };
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 1a541666fd..8b6365172a 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -21,12 +21,9 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -62,7 +59,7 @@ public class WorkerManagerThread implements Runnable {
      * task callback service
      */
     @Autowired
-    private TaskCallbackService taskCallbackService;
+    private WorkerMessageSender workerMessageSender;
 
     private volatile int workerExecThreads;
 
@@ -110,20 +107,8 @@ public class WorkerManagerThread implements Runnable {
         waitSubmitQueue.stream()
                           .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
                           .forEach(waitSubmitQueue::remove);
-        sendTaskKillResponse(taskInstanceId);
     }
 
-    /**
-     * kill task before execute , like delay task
-     */
-    private void sendTaskKillResponse(Integer taskInstanceId) {
-        TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
-        if (taskExecutionContext == null) {
-            return;
-        }
-        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL);
-        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
-    }
 
     /**
      * submit task
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
similarity index 61%
rename from dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
rename to dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
index 2187a19281..08db9060a6 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
@@ -25,11 +25,12 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
@@ -48,19 +49,20 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * test task execute processor
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
-    JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+@PrepareForTest({SpringApplicationContext.class, WorkerConfig.class, FileUtils.class, JsonSerializer.class,
+    JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
 @Ignore
-public class TaskExecuteProcessorTest {
+public class TaskDispatchProcessorTest {
 
     private TaskExecutionContext taskExecutionContext;
 
-    private TaskCallbackService taskCallbackService;
+    private WorkerMessageSender workerMessageSender;
 
     private ExecutorService workerExecService;
 
@@ -72,7 +74,7 @@ public class TaskExecuteProcessorTest {
 
     private Command ackCommand;
 
-    private TaskExecuteRequestCommand taskRequestCommand;
+    private TaskDispatchCommand taskRequestCommand;
 
     private AlertClientService alertClientService;
 
@@ -86,66 +88,73 @@ public class TaskExecuteProcessorTest {
         workerConfig.setExecThreads(1);
         workerConfig.setListenPort(1234);
         command = new Command();
-        command.setType(CommandType.TASK_EXECUTE_REQUEST);
-        ackCommand = new TaskExecuteRunningCommand().convert2Command();
-        taskRequestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
+        command.setType(CommandType.TASK_DISPATCH_REQUEST);
+        ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234",
+                                                   "127.0.0.1:5678",
+                                                   System.currentTimeMillis()).convert2Command();
+        taskRequestCommand = new TaskDispatchCommand(taskExecutionContext,
+                                                     "127.0.0.1:5678",
+                                                     "127.0.0.1:1234",
+                                                     System.currentTimeMillis());
         alertClientService = PowerMockito.mock(AlertClientService.class);
         workerExecService = PowerMockito.mock(ExecutorService.class);
-        PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
-                .thenReturn(null);
+        PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))).thenReturn(null);
 
         PowerMockito.mockStatic(ChannelUtils.class);
         PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
 
-        taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
-        PowerMockito.doNothing().when(taskCallbackService).send(taskExecutionContext.getTaskInstanceId(), ackCommand);
+        workerMessageSender = PowerMockito.mock(WorkerMessageSender.class);
 
         PowerMockito.mockStatic(SpringApplicationContext.class);
-        PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
-            .thenReturn(taskCallbackService);
-        PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
-            .thenReturn(workerConfig);
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerMessageSender.class)).thenReturn(workerMessageSender);
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
 
         workerManager = PowerMockito.mock(WorkerManagerThread.class);
 
         storageOperate = PowerMockito.mock(StorageOperate.class);
-        PowerMockito.when(
-                workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, storageOperate)))
-            .thenReturn(Boolean.TRUE);
+        PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext,
+                                                                    "127.0.0.1:5678",
+                                                                    workerMessageSender,
+                                                                    alertClientService,
+                                                                    storageOperate))).thenReturn(Boolean.TRUE);
 
-        PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
-            .thenReturn(workerManager);
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
 
         PowerMockito.mockStatic(ThreadUtils.class);
-        PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()))
-            .thenReturn(workerExecService);
+        PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
+                                                                   workerConfig.getExecThreads())).thenReturn(
+            workerExecService);
 
         PowerMockito.mockStatic(JsonSerializer.class);
-        PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
-                .thenReturn(taskRequestCommand);
+        PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn(
+            taskRequestCommand);
 
         PowerMockito.mockStatic(JSONUtils.class);
-        PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
-                .thenReturn(taskRequestCommand);
+        PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn(
+            taskRequestCommand);
 
         PowerMockito.mockStatic(FileUtils.class);
         PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
-                taskExecutionContext.getProcessDefineCode(),
-                taskExecutionContext.getProcessDefineVersion(),
-                taskExecutionContext.getProcessInstanceId(),
-                taskExecutionContext.getTaskInstanceId()))
-            .thenReturn(taskExecutionContext.getExecutePath());
+                                                      taskExecutionContext.getProcessDefineCode(),
+                                                      taskExecutionContext.getProcessDefineVersion(),
+                                                      taskExecutionContext.getProcessInstanceId(),
+                                                      taskExecutionContext.getTaskInstanceId())).thenReturn(
+            taskExecutionContext.getExecutePath());
         PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
 
-        SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(
-            null, null, null, alertClientService, storageOperate);
-        PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
-            .thenReturn(simpleTaskExecuteThread);
+        SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(new TaskExecutionContext(),
+                                                                                      workerMessageSender,
+                                                                                      "127.0.0.1:5678",
+                                                                                      LoggerFactory.getLogger(
+                                                                                          TaskDispatchProcessorTest.class),
+                                                                                      alertClientService,
+                                                                                      storageOperate);
+        PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments().thenReturn(simpleTaskExecuteThread);
     }
 
     @Test
     public void testNormalExecution() {
-        TaskExecuteProcessor processor = new TaskExecuteProcessor();
+        TaskDispatchProcessor processor = new TaskDispatchProcessor();
         processor.process(null, command);
 
         Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
@@ -154,7 +163,7 @@ public class TaskExecuteProcessorTest {
     @Test
     public void testDelayExecution() {
         taskExecutionContext.setDelayTime(1);
-        TaskExecuteProcessor processor = new TaskExecuteProcessor();
+        TaskDispatchProcessor processor = new TaskDispatchProcessor();
         processor.process(null, command);
 
         Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
@@ -179,11 +188,12 @@ public class TaskExecuteProcessorTest {
     private static class SimpleTaskExecuteThread extends TaskExecuteThread {
 
         public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
-                                       TaskCallbackService taskCallbackService,
+                                       WorkerMessageSender workerMessageSender,
+                                       String masterAddress,
                                        Logger taskLogger,
                                        AlertClientService alertClientService,
                                        StorageOperate storageOperate) {
-            super(taskExecutionContext, taskCallbackService, alertClientService, storageOperate);
+            super(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, storageOperate);
         }
 
         @Override
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index 90577111b9..bd492c7ff6 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
@@ -48,7 +48,7 @@ public class TaskExecuteThreadTest {
     private TaskExecutionContext taskExecutionContext;
 
     @Mock
-    private TaskCallbackService taskCallbackService;
+    private WorkerMessageSender workerMessageSender;
 
     @Mock
     private AlertClientService alertClientService;
@@ -61,8 +61,12 @@ public class TaskExecuteThreadTest {
 
     @Test
     public void checkTest() {
-        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService,
-            alertClientService, taskPluginManager, storageOperate);
+        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext,
+                                                                    "127.0.0.1:5678",
+                                                                    workerMessageSender,
+                                                                    alertClientService,
+                                                                    taskPluginManager,
+                                                                    storageOperate);
 
         String path = "/";
         Map<String, String> projectRes = new HashMap<>();