You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/06/28 08:08:41 UTC

[dolphinscheduler] branch dev updated: [Bug] [Master] Worker failover will cause task cannot be failover (#10631)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 66624c5c86 [Bug] [Master] Worker failover will cause task cannot be failover (#10631)
66624c5c86 is described below

commit 66624c5c86dd0bb2ae658eaf68691e03c86ae5e0
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Jun 28 16:08:35 2022 +0800

    [Bug] [Master] Worker failover will cause task cannot be failover (#10631)
    
    * fix worker failover may lose event
---
 .../dolphinscheduler/common/enums/StateEvent.java  |  77 +---
 .../dolphinscheduler/common/utils/OSUtils.java     |  21 +-
 .../cache/ProcessInstanceExecCacheManager.java     |   4 +-
 .../impl/ProcessInstanceExecCacheManagerImpl.java  |   7 +-
 .../master/consumer/TaskPriorityQueueConsumer.java |   8 +-
 .../master/metrics/ProcessInstanceMetrics.java     |  49 ++-
 .../master/runner/MasterSchedulerService.java      |  73 ++--
 .../master/runner/StateWheelExecuteThread.java     |  14 +-
 .../master/runner/WorkflowExecuteRunnable.java     |  27 +-
 .../master/runner/WorkflowExecuteThreadPool.java   |  12 +-
 .../master/runner/task/BaseTaskProcessor.java      |  21 +-
 .../server/master/runner/task/TaskInstanceKey.java |  62 +---
 .../server/master/service/FailoverService.java     | 104 +++---
 .../src/main/resources/application.yaml            |   2 +-
 .../ProcessInstanceExecCacheManagerImplTest.java   |   2 +-
 .../master/dispatch/ExecutionContextTestUtils.java |   3 +-
 .../executor/NettyExecutorManagerTest.java         |   2 +-
 .../server/master/service/FailoverServiceTest.java |  21 +-
 .../dolphinscheduler/meter/MeterConfiguration.java |   4 -
 .../dolphinscheduler/remote/command/Command.java   |   3 +
 .../remote/command/TaskExecuteRequestCommand.java  |  42 +--
 .../server/utils/ProcessUtils.java                 |   8 +-
 .../service/process/ProcessService.java            |   8 +-
 .../service/process/ProcessServiceImpl.java        |  23 +-
 .../service/process/ProcessServiceTest.java        |  53 +--
 .../plugin/task/api/TaskExecutionContext.java      | 409 +--------------------
 .../plugin/task/api/enums/ExecutionStatus.java     |  23 ++
 .../server/worker/metrics/WorkerServerMetrics.java |  25 +-
 .../worker/processor/TaskCallbackService.java      |  26 +-
 .../worker/processor/TaskExecuteProcessor.java     |   3 +-
 .../worker/runner/RetryReportTaskStatusThread.java |  86 +++--
 .../server/worker/runner/TaskExecuteThread.java    |   3 +-
 .../worker/processor/TaskExecuteProcessorTest.java |   4 +-
 33 files changed, 393 insertions(+), 836 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
index 405df09d3e..7f4be924dd 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
@@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.common.enums;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 
 import io.netty.channel.Channel;
+import lombok.Data;
 
 /**
  * state event
  */
+@Data
 public class StateEvent {
 
     /**
@@ -45,79 +47,4 @@ public class StateEvent {
 
     private Channel channel;
 
-    public ExecutionStatus getExecutionStatus() {
-        return executionStatus;
-    }
-
-    public void setExecutionStatus(ExecutionStatus executionStatus) {
-        this.executionStatus = executionStatus;
-    }
-
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public long getTaskCode() {
-        return taskCode;
-    }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public void setProcessInstanceId(int processInstanceId) {
-        this.processInstanceId = processInstanceId;
-    }
-
-    public String getContext() {
-        return context;
-    }
-
-    public void setContext(String context) {
-        this.context = context;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public void setTaskCode(long taskCode) {
-        this.taskCode = taskCode;
-    }
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public void setChannel(Channel channel) {
-        this.channel = channel;
-    }
-
-    @Override
-    public String toString() {
-        return "State Event :"
-                + "key: " + key
-                + " type: " + type
-                + " executeStatus: " + executionStatus
-                + " task instance id: " + taskInstanceId
-                + " process instance id: " + processInstanceId
-                + " context: " + context
-                ;
-    }
-
-    public String getKey() {
-        return key;
-    }
-
-    public void setKey(String key) {
-        this.key = key;
-    }
-
-    public void setType(StateEventType type) {
-        this.type = type;
-    }
-
-    public StateEventType getType() {
-        return this.type;
-    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
index 723a6639c3..a072d5a232 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
@@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.common.utils;
 
 import org.apache.dolphinscheduler.common.shell.ShellExecutor;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.File;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
@@ -460,24 +460,23 @@ public class OSUtils {
     }
 
     /**
-     * check memory and cpu usage
+     * Check memory and cpu usage is overload the given thredshod.
      *
-     * @param maxCpuloadAvg maxCpuloadAvg
+     * @param maxCpuLoadAvg  maxCpuLoadAvg
      * @param reservedMemory reservedMemory
-     * @return check memory and cpu usage
+     * @return True, if the cpu or memory exceed the given thredshod.
      */
-    public static Boolean checkResource(double maxCpuloadAvg, double reservedMemory) {
+    public static Boolean isOverload(double maxCpuLoadAvg, double reservedMemory) {
         // system load average
         double loadAverage = loadAverage();
         // system available physical memory
         double availablePhysicalMemorySize = availablePhysicalMemorySize();
-        if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
-            logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
-                    loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
-            return false;
-        } else {
+        if (loadAverage > maxCpuLoadAvg || availablePhysicalMemorySize < reservedMemory) {
+            logger.warn("Current cpu load average {} is too high or available memory {}G is too low, under max.cpuLoad.avg={} and reserved.memory={}G",
+                loadAverage, availablePhysicalMemorySize, maxCpuLoadAvg, reservedMemory);
             return true;
         }
+        return false;
     }
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
index 2f5f6dc472..59e064105a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.cache;
 
+import lombok.NonNull;
+
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
 import java.util.Collection;
@@ -55,7 +57,7 @@ public interface ProcessInstanceExecCacheManager {
      * @param processInstanceId     processInstanceId
      * @param workflowExecuteThread if it is null, will not be cached
      */
-    void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread);
+    void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);
 
     /**
      * get all WorkflowExecuteThread from cache
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
index dc562d37bd..8f00029a3e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
@@ -30,6 +30,8 @@ import org.springframework.stereotype.Component;
 
 import com.google.common.collect.ImmutableList;
 
+import lombok.NonNull;
+
 /**
  * cache of process instance id and WorkflowExecuteThread
  */
@@ -59,10 +61,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
     }
 
     @Override
-    public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) {
-        if (workflowExecuteThread == null) {
-            return;
-        }
+    public void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread) {
         processInstanceExecMaps.put(processInstanceId, workflowExecuteThread);
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index eab33a07a6..e8cf4b2919 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -131,9 +131,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
                     for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                         taskPriorityQueue.put(dispatchFailedTask);
                     }
-                    // If there are tasks in a cycle that cannot find the worker group,
-                    // sleep for 1 second
-                    if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
+                    // If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher.
+                    if (fetchTaskNum == failedDispatchTasks.size()) {
                         TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                     }
                 }
@@ -218,8 +217,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
     }
 
     private Command toCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
-        requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
+        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
         return requestCommand.convert2Command();
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
index 1693972dac..ad0e479e5b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
@@ -17,11 +17,13 @@
 
 package org.apache.dolphinscheduler.server.master.metrics;
 
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import io.micrometer.core.instrument.Counter;
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
 
 public final class ProcessInstanceMetrics {
 
@@ -29,15 +31,25 @@ public final class ProcessInstanceMetrics {
         throw new UnsupportedOperationException("Utility class");
     }
 
+    private static final Timer COMMAND_QUERY_TIMETER =
+        Timer.builder("ds.workflow.command.query.duration")
+            .description("Command query duration")
+            .register(Metrics.globalRegistry);
+
+    private static final Timer PROCESS_INSTANCE_GENERATE_TIMER =
+        Timer.builder("ds.workflow.instance.generate.duration")
+            .description("Process instance generated duration")
+            .register(Metrics.globalRegistry);
+
     private static final Counter PROCESS_INSTANCE_SUBMIT_COUNTER =
-            Counter.builder("ds.workflow.instance.submit.count")
-                    .description("Process instance submit total count")
-                    .register(Metrics.globalRegistry);
+        Counter.builder("ds.workflow.instance.submit.count")
+            .description("Process instance submit total count")
+            .register(Metrics.globalRegistry);
 
     private static final Counter PROCESS_INSTANCE_TIMEOUT_COUNTER =
-            Counter.builder("ds.workflow.instance.timeout.count")
-                    .description("Process instance timeout total count")
-                    .register(Metrics.globalRegistry);
+        Counter.builder("ds.workflow.instance.timeout.count")
+            .description("Process instance timeout total count")
+            .register(Metrics.globalRegistry);
 
     private static final Counter PROCESS_INSTANCE_FINISH_COUNTER =
             Counter.builder("ds.workflow.instance.finish.count")
@@ -55,19 +67,27 @@ public final class ProcessInstanceMetrics {
                     .register(Metrics.globalRegistry);
 
     private static final Counter PROCESS_INSTANCE_STOP_COUNTER =
-            Counter.builder("ds.workflow.instance.stop.count")
-                    .description("Process instance stop total count")
-                    .register(Metrics.globalRegistry);
+        Counter.builder("ds.workflow.instance.stop.count")
+            .description("Process instance stop total count")
+            .register(Metrics.globalRegistry);
 
     private static final Counter PROCESS_INSTANCE_FAILOVER_COUNTER =
-            Counter.builder("ds.workflow.instance.failover.count")
-                    .description("Process instance failover total count")
-                    .register(Metrics.globalRegistry);
+        Counter.builder("ds.workflow.instance.failover.count")
+            .description("Process instance failover total count")
+            .register(Metrics.globalRegistry);
+
+    public static void recordCommandQueryTime(long milliseconds) {
+        COMMAND_QUERY_TIMETER.record(milliseconds, TimeUnit.MILLISECONDS);
+    }
+
+    public static void recordProcessInstanceGenerateTime(long milliseconds) {
+        PROCESS_INSTANCE_GENERATE_TIMER.record(milliseconds, TimeUnit.MILLISECONDS);
+    }
 
     public static synchronized void registerProcessInstanceRunningGauge(Supplier<Number> function) {
         Gauge.builder("ds.workflow.instance.running", function)
-                .description("The current running process instance count")
-                .register(Metrics.globalRegistry);
+            .description("The current running process instance count")
+            .register(Metrics.globalRegistry);
     }
 
     public static void incProcessInstanceSubmit() {
@@ -97,5 +117,4 @@ public final class ProcessInstanceMetrics {
     public static void incProcessInstanceFailover() {
         PROCESS_INSTANCE_FAILOVER_COUNTER.increment();
     }
-
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 3f5a715d7c..bce8753781 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
+import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -62,31 +63,19 @@ public class MasterSchedulerService extends BaseDaemonThread {
      */
     private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
 
-    /**
-     * dolphinscheduler database interface
-     */
     @Autowired
     private ProcessService processService;
 
-    /**
-     * master config
-     */
     @Autowired
     private MasterConfig masterConfig;
 
-    /**
-     * alert manager
-     */
     @Autowired
     private ProcessAlertManager processAlertManager;
 
-    /**
-     * netty remoting client
-     */
     private NettyRemotingClient nettyRemotingClient;
 
     @Autowired
-    NettyExecutorManager nettyExecutorManager;
+    private NettyExecutorManager nettyExecutorManager;
 
     /**
      * master prepare exec service
@@ -108,6 +97,8 @@ public class MasterSchedulerService extends BaseDaemonThread {
     @Autowired
     private CuringGlobalParamsService curingGlobalParamsService;
 
+    private String masterAddress;
+
     protected MasterSchedulerService() {
         super("MasterCommandLoopThread");
     }
@@ -119,6 +110,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
         this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
         NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+        this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
     }
 
     @Override
@@ -142,13 +134,13 @@ public class MasterSchedulerService extends BaseDaemonThread {
     public void run() {
         while (Stopper.isRunning()) {
             try {
-                boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
-                if (!runCheckFlag) {
+                boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
+                if (isOverload) {
                     MasterServerMetrics.incMasterOverload();
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                     continue;
                 }
-                scheduleProcess();
+                scheduleWorkflow();
             } catch (InterruptedException interruptedException) {
                 logger.warn("Master schedule service interrupted, close the loop", interruptedException);
                 Thread.currentThread().interrupt();
@@ -160,13 +152,12 @@ public class MasterSchedulerService extends BaseDaemonThread {
     }
 
     /**
-     * 1. get command by slot
-     * 2. donot handle command if slot is empty
+     * Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool.
      */
-    private void scheduleProcess() throws InterruptedException {
+    private void scheduleWorkflow() throws InterruptedException {
         List<Command> commands = findCommands();
         if (CollectionUtils.isEmpty(commands)) {
-            //indicate that no command ,sleep for 1s
+            // indicate that no command ,sleep for 1s
             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             return;
         }
@@ -181,7 +172,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
             try {
                 LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
                 logger.info("Master schedule service starting workflow instance");
-                WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
+                final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
                     processInstance
                     , processService
                     , nettyExecutorManager
@@ -194,9 +185,14 @@ public class MasterSchedulerService extends BaseDaemonThread {
                 if (processInstance.getTimeout() > 0) {
                     stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
                 }
-                workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
+                ProcessInstanceMetrics.incProcessInstanceSubmit();
+                workflowExecuteThreadPool.submit(workflowExecuteRunnable);
                 logger.info("Master schedule service started workflow instance");
 
+            } catch (Exception ex) {
+                processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
+                stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
+                logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
             } finally {
                 LoggerUtils.removeWorkflowInstanceIdMDC();
             }
@@ -204,21 +200,21 @@ public class MasterSchedulerService extends BaseDaemonThread {
     }
 
     private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
+        long commandTransformStartTime = System.currentTimeMillis();
         logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size());
         List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
         CountDownLatch latch = new CountDownLatch(commands.size());
         for (final Command command : commands) {
             masterPrepareExecService.execute(() -> {
                 try {
+                    // todo: this check is not safe, the slot may change after command transform.
                     // slot check again
                     SlotCheckState slotCheckState = slotCheck(command);
                     if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
                         logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
                         return;
                     }
-                    ProcessInstance processInstance = processService.handleCommand(logger,
-                            getLocalAddress(),
-                            command);
+                    ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
                     if (processInstance != null) {
                         processInstances.add(processInstance);
                         logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
@@ -236,24 +232,26 @@ public class MasterSchedulerService extends BaseDaemonThread {
         latch.await();
         logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
             commands.size(), processInstances.size());
+        ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
         return processInstances;
     }
 
     private List<Command> findCommands() {
+        long scheduleStartTime = System.currentTimeMillis();
+        int thisMasterSlot = ServerNodeManager.getSlot();
+        int masterCount = ServerNodeManager.getMasterSize();
+        if (masterCount <= 0) {
+            logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
+            return Collections.emptyList();
+        }
         int pageNumber = 0;
         int pageSize = masterConfig.getFetchCommandNum();
-        List<Command> result = new ArrayList<>();
-        if (Stopper.isRunning()) {
-            int thisMasterSlot = ServerNodeManager.getSlot();
-            int masterCount = ServerNodeManager.getMasterSize();
-            if (masterCount > 0) {
-                result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
-                if (CollectionUtils.isNotEmpty(result)) {
-                    logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
-                        result.size(), thisMasterSlot, masterCount);
-                }
-            }
+        final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+        if (CollectionUtils.isNotEmpty(result)) {
+            logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
+                result.size(), thisMasterSlot, masterCount);
         }
+        ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
         return result;
     }
 
@@ -271,7 +269,4 @@ public class MasterSchedulerService extends BaseDaemonThread {
         return state;
     }
 
-    private String getLocalAddress() {
-        return NetUtils.getAddr(masterConfig.getListenPort());
-    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 53acb4ce83..65c7db924d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -59,24 +59,24 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
     private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
 
     /**
-     * process timeout check list
+     * ProcessInstance timeout check list, element is the processInstanceId.
      */
-    private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
 
     /**
      * task time out check list
      */
-    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
 
     /**
      * task retry check list
      */
-    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
 
     /**
      * task state check list
      */
-    private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
 
     @Autowired
     private MasterConfig masterConfig;
@@ -116,8 +116,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
         logger.info("Success add workflow instance into timeout check list");
     }
 
-    public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
-        boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId());
+    public void removeProcess4TimeoutCheck(int processInstanceId) {
+        boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
         if (removeFlag) {
             logger.info("Success remove workflow instance from timeout check list");
         } else {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9fa452c66e..42654665a2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -435,6 +435,7 @@ public class WorkflowExecuteRunnable implements Runnable {
 
         if (task.getState().typeIsFinished()) {
             if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
+                logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
                 return true;
             }
             taskFinished(task);
@@ -461,11 +462,9 @@ public class WorkflowExecuteRunnable implements Runnable {
     }
 
     private void taskFinished(TaskInstance taskInstance) {
-        logger.info("work flow {} task id:{} code:{} state:{} ",
-                processInstance.getId(),
-                taskInstance.getId(),
-                taskInstance.getTaskCode(),
-                taskInstance.getState());
+        logger.info("TaskInstance finished task code:{} state:{} ",
+            taskInstance.getTaskCode(),
+            taskInstance.getState());
 
         activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
         stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
@@ -481,12 +480,13 @@ public class WorkflowExecuteRunnable implements Runnable {
             }
         } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
             // retry task
+            logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
             retryTaskInstance(taskInstance);
         } else if (taskInstance.getState().typeIsFailure()) {
             completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
             // There are child nodes and the failure policy is: CONTINUE
-            if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
-                    && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
+            if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE
+                && DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
                 submitPostNode(Long.toString(taskInstance.getTaskCode()));
             } else {
                 errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@@ -495,6 +495,7 @@ public class WorkflowExecuteRunnable implements Runnable {
                 }
             }
         } else if (taskInstance.getState().typeIsFinished()) {
+            // todo: when the task instance type is pause, then it should not in completeTaskMap
             completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
         }
 
@@ -536,7 +537,7 @@ public class WorkflowExecuteRunnable implements Runnable {
         }
         TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
         if (newTaskInstance == null) {
-            logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
+            logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
             return;
         }
         waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
@@ -634,7 +635,7 @@ public class WorkflowExecuteRunnable implements Runnable {
      * check if task instance exist by task code
      */
     public boolean checkTaskInstanceByCode(long taskCode) {
-        if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+        if (taskInstanceMap.isEmpty()) {
             return false;
         }
         for (TaskInstance taskInstance : taskInstanceMap.values()) {
@@ -649,7 +650,7 @@ public class WorkflowExecuteRunnable implements Runnable {
      * check if task instance exist by id
      */
     public boolean checkTaskInstanceById(int taskInstanceId) {
-        if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
+        if (taskInstanceMap.isEmpty()) {
             return false;
         }
         return taskInstanceMap.containsKey(taskInstanceId);
@@ -697,7 +698,7 @@ public class WorkflowExecuteRunnable implements Runnable {
 
             if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
                 // serial wait execution type needs to wake up the waiting process
-                if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()){
+                if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
                     endProcess();
                     return true;
                 }
@@ -1296,6 +1297,10 @@ public class WorkflowExecuteRunnable implements Runnable {
         }
     }
 
+    public Collection<TaskInstance> getAllTaskInstances() {
+        return taskInstanceMap.values();
+    }
+
     private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
         //for this taskInstance all the param in this part is IN.
         thisProperty.setDirect(Direct.IN);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index eb8bcd94bb..4085e99051 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -51,7 +51,7 @@ import com.google.common.base.Strings;
 import lombok.NonNull;
 
 /**
- * Used to execute {@link WorkflowExecuteRunnable}, when
+ * Used to execute {@link WorkflowExecuteRunnable}.
  */
 @Component
 public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@@ -99,14 +99,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
         logger.info("Submit state event success, stateEvent: {}", stateEvent);
     }
 
-    /**
-     * Start the given workflow.
-     */
-    public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
-        ProcessInstanceMetrics.incProcessInstanceSubmit();
-        submit(workflowExecuteThread);
-    }
-
     /**
      * Handle the events belong to the given workflow.
      */
@@ -138,7 +130,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
                 try {
                     LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
                     if (workflowExecuteThread.workFlowFinish()) {
-                        stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
+                        stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId());
                         processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
                         notifyProcessChanged(workflowExecuteThread.getProcessInstance());
                         logger.info("Workflow instance is finished.");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 329f72d1b1..16aa9d9957 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -96,6 +96,8 @@ import org.slf4j.LoggerFactory;
 
 import com.zaxxer.hikari.HikariDataSource;
 
+import lombok.NonNull;
+
 public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
@@ -114,22 +116,19 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     protected int commitInterval;
 
-    protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
+    protected ProcessService processService;
 
-    protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+    protected MasterConfig masterConfig;
 
-    protected TaskPluginManager taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
+    protected TaskPluginManager taskPluginManager;
 
     protected String threadLoggerInfoName;
 
     @Override
-    public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
-        if (processService == null) {
-            processService = SpringApplicationContext.getBean(ProcessService.class);
-        }
-        if (masterConfig == null) {
-            masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
-        }
+    public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) {
+        processService = SpringApplicationContext.getBean(ProcessService.class);
+        masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+        taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
         this.taskInstance = taskInstance;
         this.processInstance = processInstance;
         this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@@ -245,7 +244,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     @Override
     public String getType() {
-        return null;
+        throw new UnsupportedOperationException("This abstract class doesn's has type");
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
index 5b3d788c8c..d9ca82a45c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
@@ -20,64 +20,22 @@ package org.apache.dolphinscheduler.server.master.runner.task;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
-import java.util.Objects;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
 
 /**
- * task instance key, processInstanceId
+ * Used to identify a task instance.
  */
+@Data
+@AllArgsConstructor
 public class TaskInstanceKey {
-    private int processInstanceId;
-    private long taskCode;
-    private int taskVersion;
+    private final int processInstanceId;
+    private final long taskCode;
+    private final int taskVersion;
 
-    public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) {
-        this.processInstanceId = processInstanceId;
-        this.taskCode = taskCode;
-        this.taskVersion = taskVersion;
-    }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public long getTaskCode() {
-        return taskCode;
-    }
-
-    public int getTaskVersion() {
-        return taskVersion;
-    }
-
-    public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) {
-        if (processInstance == null || taskInstance == null) {
-            return null;
-        }
+    public static TaskInstanceKey getTaskInstanceKey(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
         return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
     }
 
-    @Override
-    public String toString() {
-        return "TaskKey{"
-                + "processInstanceId=" + processInstanceId
-                + ", taskCode=" + taskCode
-                + ", taskVersion=" + taskVersion
-                + '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o;
-        return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(processInstanceId, taskCode, taskVersion);
-    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 32f6562ca0..b377ceedbf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,19 +17,19 @@
 
 package org.apache.dolphinscheduler.server.master.service;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
@@ -49,6 +49,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ import org.springframework.stereotype.Component;
 
 import io.micrometer.core.annotation.Counted;
 import io.micrometer.core.annotation.Timed;
+import lombok.NonNull;
 
 /**
  * failover service
@@ -67,15 +69,20 @@ public class FailoverService {
     private final MasterConfig masterConfig;
     private final ProcessService processService;
     private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
-    public FailoverService(RegistryClient registryClient,
-                           MasterConfig masterConfig,
-                           ProcessService processService,
-                           WorkflowExecuteThreadPool workflowExecuteThreadPool) {
-        this.registryClient = checkNotNull(registryClient);
-        this.masterConfig = checkNotNull(masterConfig);
-        this.processService = checkNotNull(processService);
-        this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool);
+    private final ProcessInstanceExecCacheManager cacheManager;
+    private final String localAddress;
+
+    public FailoverService(@NonNull RegistryClient registryClient,
+                           @NonNull MasterConfig masterConfig,
+                           @NonNull ProcessService processService,
+                           @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
+                           @NonNull ProcessInstanceExecCacheManager cacheManager) {
+        this.registryClient = registryClient;
+        this.masterConfig = masterConfig;
+        this.processService = processService;
+        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+        this.cacheManager = cacheManager;
+        this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
     }
 
     /**
@@ -88,7 +95,7 @@ public class FailoverService {
         if (CollectionUtils.isEmpty(hosts)) {
             return;
         }
-        LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts);
+        LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts);
 
         for (String host : hosts) {
             failoverMasterWithLock(host);
@@ -174,11 +181,10 @@ public class FailoverService {
     }
 
     /**
-     * failover worker tasks
+     * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
+     * and failover these tasks.
      * <p>
-     * 1. kill yarn job if there are yarn jobs in tasks.
-     * 2. change task state from running to need failover.
-     * 3. failover all tasks when workerHost is null
+     * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
      *
      * @param workerHost worker host
      */
@@ -188,29 +194,40 @@ public class FailoverService {
         }
 
         long startTime = System.currentTimeMillis();
-        List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
-        Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
+        // we query the task instance from cache, so that we can directly update the cache
+        final List<TaskInstance> needFailoverTaskInstanceList = cacheManager.getAll()
+            .stream()
+            .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
+            .filter(taskInstance ->
+                workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
+            .collect(Collectors.toList());
+        final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
         LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
-        List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
+        final List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
         for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
-            ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
-            if (processInstance == null) {
-                processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
+            try {
+                ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
                 if (processInstance == null) {
-                    LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
-                        taskInstance.getProcessInstanceId(), taskInstance.getId());
+                    processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance();
+                    if (processInstance == null) {
+                        LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
+                            taskInstance.getProcessInstanceId(), taskInstance.getId());
+                        continue;
+                    }
+                    processInstanceCacheMap.put(processInstance.getId(), processInstance);
+                }
+
+                // only failover the task owned myself if worker down.
+                if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
                     continue;
                 }
-                processInstanceCacheMap.put(processInstance.getId(), processInstance);
-            }
 
-            // only failover the task owned myself if worker down.
-            if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
-                continue;
+                LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+                failoverTaskInstance(processInstance, taskInstance, workerServers);
+            } finally {
+                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
-
-            LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-            failoverTaskInstance(processInstance, taskInstance, workerServers);
         }
         LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
     }
@@ -221,17 +238,14 @@ public class FailoverService {
      * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
      * 2. change task state from running to need failover.
      * 3. try to notify local master
+     *
      * @param processInstance
      * @param taskInstance
-     * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
+     * @param servers         if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
      */
-    private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
-        if (processInstance == null) {
-            LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
-                taskInstance.getProcessInstanceId(), taskInstance.getId());
-            return;
-        }
+    private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
         if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
+            LOGGER.info("The taskInstance doesn't need to failover");
             return;
         }
         TaskMetrics.incTaskFailover();
@@ -240,6 +254,7 @@ public class FailoverService {
         taskInstance.setProcessInstance(processInstance);
 
         if (!isMasterTask) {
+            LOGGER.info("The failover taskInstance is not master task");
             TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildProcessInstanceRelatedInfo(processInstance)
@@ -249,6 +264,8 @@ public class FailoverService {
                 // only kill yarn job if exists , the local thread has exited
                 ProcessUtils.killYarnJob(taskExecutionContext);
             }
+        } else {
+            LOGGER.info("The failover taskInstance is a master task");
         }
 
         taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
@@ -278,7 +295,7 @@ public class FailoverService {
         while (iterator.hasNext()) {
             String host = iterator.next();
             if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
-                if (!getLocalAddress().equals(host)) {
+                if (!localAddress.equals(host)) {
                     iterator.remove();
                 }
             }
@@ -390,11 +407,8 @@ public class FailoverService {
         return serverStartupTime;
     }
 
-    /**
-     * get local address
-     */
-    String getLocalAddress() {
-        return NetUtils.getAddr(masterConfig.getListenPort());
+    public String getLocalAddress() {
+        return localAddress;
     }
 
 }
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index 619322bfab..a908ac3072 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -92,7 +92,7 @@ master:
   pre-exec-threads: 10
   # master execute thread number to limit process instances in parallel
   exec-threads: 100
-  # master dispatch task number per batch
+  # master dispatch task number per batch, if all the tasks dispatch failed in a batch, will sleep 1s.
   dispatch-task-number: 3
   # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
   host-selector: lower_weight
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
index d7b53f3c34..96bdfe4914 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
@@ -56,7 +56,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
         Assert.assertTrue(processInstanceExecCacheManager.contains(1));
     }
 
-    @Test
+    @Test(expected = NullPointerException.class)
     public void testCacheNull() {
         processInstanceExecCacheManager.cache(2, null);
         WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
index 06265d2e35..b2c7d63cd7 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
@@ -49,8 +49,7 @@ public class ExecutionContextTestUtils {
                 .buildProcessDefinitionRelatedInfo(processDefinition)
                 .create();
 
-        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
-        requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(context));
+        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
         Command command = requestCommand.convert2Command();
 
         ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index 88c6655d2a..fdd79552b1 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -97,7 +97,7 @@ public class NettyExecutorManagerTest {
 
     private Command toCommand(TaskExecutionContext taskExecutionContext) {
         TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
-        requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
+        requestCommand.setTaskExecutionContext(taskExecutionContext);
         return requestCommand.convert2Command();
     }
 }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 4e3d99347a..fc9bcca3aa 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -18,8 +18,8 @@
 package org.apache.dolphinscheduler.server.master.service;
 
 import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
+
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.doNothing;
 
@@ -31,7 +31,9 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -45,7 +47,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -63,7 +64,6 @@ import com.google.common.collect.Lists;
 @PrepareForTest({RegistryClient.class})
 @PowerMockIgnore({"javax.management.*"})
 public class FailoverServiceTest {
-    @InjectMocks
     private FailoverService failoverService;
 
     @Mock
@@ -78,6 +78,9 @@ public class FailoverServiceTest {
     @Mock
     private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
+    @Mock
+    private ProcessInstanceExecCacheManager cacheManager;
+
     private static int masterPort = 5678;
     private static int workerPort = 1234;
 
@@ -95,6 +98,7 @@ public class FailoverServiceTest {
         springApplicationContext.setApplicationContext(applicationContext);
 
         given(masterConfig.getListenPort()).willReturn(masterPort);
+        failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager);
 
         testMasterHost = failoverService.getLocalAddress();
         String ip = testMasterHost.split(":")[0];
@@ -182,7 +186,16 @@ public class FailoverServiceTest {
 
     @Test
     public void failoverWorkTest() {
+        workerTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class);
+        Mockito.when(workflowExecuteRunnable.getAllTaskInstances()).thenReturn(Lists.newArrayList(workerTaskInstance));
+        Mockito.when(workflowExecuteRunnable.getProcessInstance()).thenReturn(processInstance);
+
+        Mockito.when(cacheManager.getAll()).thenReturn(Lists.newArrayList(workflowExecuteRunnable));
+        Mockito.when(cacheManager.getByProcessInstanceId(Mockito.anyInt())).thenReturn(workflowExecuteRunnable);
+
+
         failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER);
-        Assert.assertEquals(workerTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
+        Assert.assertEquals(ExecutionStatus.NEED_FAULT_TOLERANCE, workerTaskInstance.getState());
     }
 }
diff --git a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java
index b7aecf7ee6..64259f2f4f 100644
--- a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java
+++ b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java
@@ -20,10 +20,6 @@
 
 package org.apache.dolphinscheduler.meter;
 
-import javax.annotation.PostConstruct;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
index 9baa321a9e..527a269411 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.command;
 
 import java.io.Serializable;
@@ -25,6 +26,8 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class Command implements Serializable {
 
+    private static final long serialVersionUID = -1L;
+
     private static final AtomicLong REQUEST_ID = new AtomicLong(1);
 
     public static final byte MAGIC = (byte) 0xbabe;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
index 5b2e33922c..000e3f4e02 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
@@ -18,39 +18,23 @@
 package org.apache.dolphinscheduler.remote.command;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 
 import java.io.Serializable;
 
-/**
- *  execute task request command
- */
-public class TaskExecuteRequestCommand implements Serializable {
-
-    /**
-     *  task execution context
-     */
-    private String taskExecutionContext;
-
-    public String getTaskExecutionContext() {
-        return taskExecutionContext;
-    }
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-    public void setTaskExecutionContext(String taskExecutionContext) {
-        this.taskExecutionContext = taskExecutionContext;
-    }
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskExecuteRequestCommand implements Serializable {
 
-    public TaskExecuteRequestCommand() {
-    }
+    private static final long serialVersionUID = -1L;
 
-    public TaskExecuteRequestCommand(String taskExecutionContext) {
-        this.taskExecutionContext = taskExecutionContext;
-    }
+    private TaskExecutionContext taskExecutionContext;
 
-    /**
-     *  package request command
-     *
-     * @return command
-     */
     public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.TASK_EXECUTE_REQUEST);
@@ -59,10 +43,4 @@ public class TaskExecuteRequestCommand implements Serializable {
         return command;
     }
 
-    @Override
-    public String toString() {
-        return "TaskExecuteRequestCommand{"
-                + "taskExecutionContext='" + taskExecutionContext + '\''
-                + '}';
-    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 2637d6a11a..125b2d82db 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
+
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -43,6 +44,8 @@ import java.util.regex.Pattern;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import lombok.NonNull;
+
 /**
  * mainly used to get the start command line of a process.
  */
@@ -181,7 +184,10 @@ public class ProcessUtils {
      * @param taskExecutionContext taskExecutionContext
      * @return yarn application ids
      */
-    public static List<String> killYarnJob(TaskExecutionContext taskExecutionContext) {
+    public static List<String> killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) {
+        if (taskExecutionContext.getLogPath() == null) {
+            return Collections.emptyList();
+        }
         try {
             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             String log;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 9ff8689fd3..8d61a9a460 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -52,16 +52,16 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
-import org.slf4j.Logger;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
+import org.springframework.transaction.annotation.Transactional;
+
 public interface ProcessService {
     @Transactional
-    ProcessInstance handleCommand(Logger logger, String host, Command command);
+    ProcessInstance handleCommand(String host, Command command);
 
     void moveToErrorCommand(Command command, String message);
 
@@ -161,8 +161,6 @@ public interface ProcessService {
 
     void changeOutParam(TaskInstance taskInstance);
 
-    List<String> convertIntListToString(List<Integer> intList);
-
     Schedule querySchedule(int id);
 
     List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index a8ae3f33a9..fc436ee4bd 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.service.process;
 
-import static java.util.stream.Collectors.toSet;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
 import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
 import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
 
+import static java.util.stream.Collectors.toSet;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -173,13 +174,6 @@ public class ProcessServiceImpl implements ProcessService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-        ExecutionStatus.DISPATCH.ordinal(),
-        ExecutionStatus.RUNNING_EXECUTION.ordinal(),
-        ExecutionStatus.DELAY_EXECUTION.ordinal(),
-        ExecutionStatus.READY_PAUSE.ordinal(),
-        ExecutionStatus.READY_STOP.ordinal()};
-
     @Autowired
     private UserMapper userMapper;
 
@@ -290,7 +284,7 @@ public class ProcessServiceImpl implements ProcessService {
      */
     @Override
     @Transactional
-    public ProcessInstance handleCommand(Logger logger, String host, Command command) {
+    public ProcessInstance handleCommand(String host, Command command) {
         ProcessInstance processInstance = constructProcessInstance(command, host);
         // cannot construct process instance, return null
         if (processInstance == null) {
@@ -1973,8 +1967,7 @@ public class ProcessServiceImpl implements ProcessService {
      * @param intList intList
      * @return string list
      */
-    @Override
-    public List<String> convertIntListToString(List<Integer> intList) {
+    private List<String> convertIntListToString(List<Integer> intList) {
         if (intList == null) {
             return new ArrayList<>();
         }
@@ -2039,12 +2032,12 @@ public class ProcessServiceImpl implements ProcessService {
      */
     @Override
     public List<ProcessInstance> queryNeedFailoverProcessInstances(String host) {
-        return processInstanceMapper.queryByHostAndStatus(host, stateArray);
+        return processInstanceMapper.queryByHostAndStatus(host, ExecutionStatus.getNeedFailoverWorkflowInstanceState());
     }
 
     @Override
     public List<String> queryNeedFailoverProcessInstanceHost() {
-        return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
+        return processInstanceMapper.queryNeedFailoverProcessInstanceHost(ExecutionStatus.getNeedFailoverWorkflowInstanceState());
     }
 
     /**
@@ -2081,7 +2074,7 @@ public class ProcessServiceImpl implements ProcessService {
     @Override
     public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
         return taskInstanceMapper.queryByHostAndStatus(host,
-            stateArray);
+            ExecutionStatus.getNeedFailoverWorkflowInstanceState());
     }
 
     /**
@@ -2215,7 +2208,7 @@ public class ProcessServiceImpl implements ProcessService {
         return processInstanceMapper.queryLastRunningProcess(definitionCode,
             startTime,
             endTime,
-            stateArray);
+            ExecutionStatus.getNeedFailoverWorkflowInstanceState());
     }
 
     /**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 0a859235ac..0dd9734bd1 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -17,7 +17,12 @@
 
 package org.apache.dolphinscheduler.service.process;
 
-import com.fasterxml.jackson.databind.JsonNode;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+
+import static org.mockito.ArgumentMatchers.any;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Flag;
@@ -72,9 +77,17 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.spi.params.base.FormType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -88,17 +101,7 @@ import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.mockito.ArgumentMatchers.any;
+import com.fasterxml.jackson.databind.JsonNode;
 
 /**
  * process service test
@@ -289,8 +292,8 @@ public class ProcessServiceTest {
         command.setProcessDefinitionCode(222);
         command.setCommandType(CommandType.REPEAT_RUNNING);
         command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
-                + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
-        Assert.assertNull(processService.handleCommand(logger, host, command));
+            + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
+        Assert.assertNull(processService.handleCommand(host, command));
 
         int definitionVersion = 1;
         long definitionCode = 123;
@@ -325,7 +328,7 @@ public class ProcessServiceTest {
         Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
         Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command1));
+        Assert.assertNotNull(processService.handleCommand(host, command1));
 
         Command command2 = new Command();
         command2.setId(2);
@@ -335,7 +338,7 @@ public class ProcessServiceTest {
         command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
         command2.setProcessInstanceId(processInstanceId);
         Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command2));
+        Assert.assertNotNull(processService.handleCommand(host, command2));
 
         Command command3 = new Command();
         command3.setId(3);
@@ -345,7 +348,7 @@ public class ProcessServiceTest {
         command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
         command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
         Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command3));
+        Assert.assertNotNull(processService.handleCommand(host, command3));
 
         Command command4 = new Command();
         command4.setId(4);
@@ -355,7 +358,7 @@ public class ProcessServiceTest {
         command4.setCommandType(CommandType.REPEAT_RUNNING);
         command4.setProcessInstanceId(processInstanceId);
         Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command4));
+        Assert.assertNotNull(processService.handleCommand(host, command4));
 
         Command command5 = new Command();
         command5.setId(5);
@@ -374,7 +377,7 @@ public class ProcessServiceTest {
                 processDefinition.getGlobalParamList(),
                 CommandType.START_PROCESS,
                 processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam1\"");
-        ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5);
+        ProcessInstance processInstance1 = processService.handleCommand(host, command5);
         Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
 
         ProcessDefinition processDefinition1 = new ProcessDefinition();
@@ -399,7 +402,7 @@ public class ProcessServiceTest {
         Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
         Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
         Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command1));
+        Assert.assertNotNull(processService.handleCommand(host, command1));
 
         Command command6 = new Command();
         command6.setId(6);
@@ -410,7 +413,7 @@ public class ProcessServiceTest {
         Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
         Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
         Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
-        ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
+        ProcessInstance processInstance6 = processService.handleCommand(host, command6);
         Assert.assertTrue(processInstance6 != null);
 
         processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
@@ -429,7 +432,7 @@ public class ProcessServiceTest {
         command7.setProcessDefinitionVersion(1);
         Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
         Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
-        ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
+        ProcessInstance processInstance8 = processService.handleCommand(host, command7);
         Assert.assertTrue(processInstance8 != null);
 
         ProcessDefinition processDefinition2 = new ProcessDefinition();
@@ -453,7 +456,7 @@ public class ProcessServiceTest {
         Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
         Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
         Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
-        ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);
+        ProcessInstance processInstance10 = processService.handleCommand(host, command9);
         Assert.assertTrue(processInstance10 == null);
     }
 
@@ -494,7 +497,7 @@ public class ProcessServiceTest {
         Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
 
         // will throw exception when command id is 0 and delete fail
-        processService.handleCommand(logger, host, command1);
+        processService.handleCommand(host, command1);
     }
 
     @Test
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 4e1958c671..2ada887868 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -22,13 +22,25 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 
+import java.io.Serializable;
 import java.util.Date;
 import java.util.Map;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 /**
  * to master/worker task transport
  */
-public class TaskExecutionContext {
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskExecutionContext implements Serializable {
+
+    private static final long serialVersionUID = -1L;
 
     /**
      * task id
@@ -248,399 +260,4 @@ public class TaskExecutionContext {
      * max memory
      */
     private Integer memoryMax;
-
-    public String getTaskLogName() {
-        return taskLogName;
-    }
-
-    public void setTaskLogName(String taskLogName) {
-        this.taskLogName = taskLogName;
-    }
-
-    public Map<String, String> getResources() {
-        return resources;
-    }
-
-    public void setResources(Map<String, String> resources) {
-        this.resources = resources;
-    }
-
-    public Map<String, Property> getParamsMap() {
-        return paramsMap;
-    }
-
-    public void setParamsMap(Map<String, Property> paramsMap) {
-        this.paramsMap = paramsMap;
-    }
-
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public String getTaskName() {
-        return taskName;
-    }
-
-    public void setTaskName(String taskName) {
-        this.taskName = taskName;
-    }
-
-    public Date getFirstSubmitTime() {
-        return firstSubmitTime;
-    }
-
-    public void setFirstSubmitTime(Date firstSubmitTime) {
-        this.firstSubmitTime = firstSubmitTime;
-    }
-
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    public String getTaskType() {
-        return taskType;
-    }
-
-    public void setTaskType(String taskType) {
-        this.taskType = taskType;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public String getExecutePath() {
-        return executePath;
-    }
-
-    public void setExecutePath(String executePath) {
-        this.executePath = executePath;
-    }
-
-    public String getLogPath() {
-        return logPath;
-    }
-
-    public void setLogPath(String logPath) {
-        this.logPath = logPath;
-    }
-
-    public String getTaskJson() {
-        return taskJson;
-    }
-
-    public void setTaskJson(String taskJson) {
-        this.taskJson = taskJson;
-    }
-
-    public int getProcessId() {
-        return processId;
-    }
-
-    public void setProcessId(int processId) {
-        this.processId = processId;
-    }
-
-    public String getAppIds() {
-        return appIds;
-    }
-
-    public void setAppIds(String appIds) {
-        this.appIds = appIds;
-    }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public void setProcessInstanceId(int processInstanceId) {
-        this.processInstanceId = processInstanceId;
-    }
-
-    public Date getScheduleTime() {
-        return scheduleTime;
-    }
-
-    public void setScheduleTime(Date scheduleTime) {
-        this.scheduleTime = scheduleTime;
-    }
-
-    public String getGlobalParams() {
-        return globalParams;
-    }
-
-    public void setGlobalParams(String globalParams) {
-        this.globalParams = globalParams;
-    }
-
-    public int getExecutorId() {
-        return executorId;
-    }
-
-    public void setExecutorId(int executorId) {
-        this.executorId = executorId;
-    }
-
-    public int getCmdTypeIfComplement() {
-        return cmdTypeIfComplement;
-    }
-
-    public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
-        this.cmdTypeIfComplement = cmdTypeIfComplement;
-    }
-
-    public String getTenantCode() {
-        return tenantCode;
-    }
-
-    public void setTenantCode(String tenantCode) {
-        this.tenantCode = tenantCode;
-    }
-
-    public String getQueue() {
-        return queue;
-    }
-
-    public void setQueue(String queue) {
-        this.queue = queue;
-    }
-
-    public int getProcessDefineId() {
-        return processDefineId;
-    }
-
-    public void setProcessDefineId(int processDefineId) {
-        this.processDefineId = processDefineId;
-    }
-
-    public int getProjectId() {
-        return projectId;
-    }
-
-    public void setProjectId(int projectId) {
-        this.projectId = projectId;
-    }
-
-    public String getTaskParams() {
-        return taskParams;
-    }
-
-    public void setTaskParams(String taskParams) {
-        this.taskParams = taskParams;
-    }
-
-    public String getEnvFile() {
-        return envFile;
-    }
-
-    public void setEnvFile(String envFile) {
-        this.envFile = envFile;
-    }
-
-    public String getEnvironmentConfig() {
-        return environmentConfig;
-    }
-
-    public void setEnvironmentConfig(String config) {
-        this.environmentConfig = config;
-    }
-
-    public Map<String, String> getDefinedParams() {
-        return definedParams;
-    }
-
-    public void setDefinedParams(Map<String, String> definedParams) {
-        this.definedParams = definedParams;
-    }
-
-    public String getTaskAppId() {
-        return taskAppId;
-    }
-
-    public void setTaskAppId(String taskAppId) {
-        this.taskAppId = taskAppId;
-    }
-
-    public TaskTimeoutStrategy getTaskTimeoutStrategy() {
-        return taskTimeoutStrategy;
-    }
-
-    public void setTaskTimeoutStrategy(TaskTimeoutStrategy taskTimeoutStrategy) {
-        this.taskTimeoutStrategy = taskTimeoutStrategy;
-    }
-
-    public int getTaskTimeout() {
-        return taskTimeout;
-    }
-
-    public void setTaskTimeout(int taskTimeout) {
-        this.taskTimeout = taskTimeout;
-    }
-
-    public String getWorkerGroup() {
-        return workerGroup;
-    }
-
-    public void setWorkerGroup(String workerGroup) {
-        this.workerGroup = workerGroup;
-    }
-
-    public int getDelayTime() {
-        return delayTime;
-    }
-
-    public void setDelayTime(int delayTime) {
-        this.delayTime = delayTime;
-    }
-
-    public ResourceParametersHelper getResourceParametersHelper() {
-        return resourceParametersHelper;
-    }
-
-    public void setResourceParametersHelper(ResourceParametersHelper resourceParametersHelper) {
-        this.resourceParametersHelper = resourceParametersHelper;
-    }
-
-    public String getVarPool() {
-        return varPool;
-    }
-
-    public void setVarPool(String varPool) {
-        this.varPool = varPool;
-    }
-
-    public int getDryRun() {
-        return dryRun;
-    }
-
-    public void setDryRun(int dryRun) {
-        this.dryRun = dryRun;
-    }
-
-    public Long getProcessDefineCode() {
-        return processDefineCode;
-    }
-
-    public void setProcessDefineCode(Long processDefineCode) {
-        this.processDefineCode = processDefineCode;
-    }
-
-    public int getProcessDefineVersion() {
-        return processDefineVersion;
-    }
-
-    public void setProcessDefineVersion(int processDefineVersion) {
-        this.processDefineVersion = processDefineVersion;
-    }
-
-    public long getProjectCode() {
-        return projectCode;
-    }
-
-    public void setProjectCode(long projectCode) {
-        this.projectCode = projectCode;
-    }
-
-    public DataQualityTaskExecutionContext getDataQualityTaskExecutionContext() {
-        return dataQualityTaskExecutionContext;
-    }
-
-    public void setDataQualityTaskExecutionContext(DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {
-        this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext;
-    }
-
-    public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
-        this.currentExecutionStatus = currentExecutionStatus;
-    }
-
-    public ExecutionStatus getCurrentExecutionStatus() {
-        return currentExecutionStatus;
-    }
-
-    public Date getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(Date endTime) {
-        this.endTime = endTime;
-    }
-
-    public Integer getCpuQuota() {
-        return cpuQuota;
-    }
-
-    public void setCpuQuota(Integer cpuQuota) {
-        this.cpuQuota = cpuQuota;
-    }
-
-    public Integer getMemoryMax() {
-        return memoryMax;
-    }
-
-    public void setMemoryMax(Integer memoryMax) {
-        this.memoryMax = memoryMax;
-    }
-
-    public K8sTaskExecutionContext getK8sTaskExecutionContext() {
-        return k8sTaskExecutionContext;
-    }
-
-    public void setK8sTaskExecutionContext(K8sTaskExecutionContext k8sTaskExecutionContext) {
-        this.k8sTaskExecutionContext = k8sTaskExecutionContext;
-    }
-
-    @Override
-    public String toString() {
-        return "TaskExecutionContext{"
-                + "taskInstanceId=" + taskInstanceId
-                + ", taskName='" + taskName + '\''
-                + ", currentExecutionStatus=" + currentExecutionStatus
-                + ", firstSubmitTime=" + firstSubmitTime
-                + ", startTime=" + startTime
-                + ", taskType='" + taskType + '\''
-                + ", host='" + host + '\''
-                + ", executePath='" + executePath + '\''
-                + ", logPath='" + logPath + '\''
-                + ", taskJson='" + taskJson + '\''
-                + ", processId=" + processId
-                + ", processDefineCode=" + processDefineCode
-                + ", processDefineVersion=" + processDefineVersion
-                + ", appIds='" + appIds + '\''
-                + ", processInstanceId=" + processInstanceId
-                + ", scheduleTime=" + scheduleTime
-                + ", globalParams='" + globalParams + '\''
-                + ", executorId=" + executorId
-                + ", cmdTypeIfComplement=" + cmdTypeIfComplement
-                + ", tenantCode='" + tenantCode + '\''
-                + ", queue='" + queue + '\''
-                + ", projectCode=" + projectCode
-                + ", taskParams='" + taskParams + '\''
-                + ", envFile='" + envFile + '\''
-                + ", dryRun='" + dryRun + '\''
-                + ", definedParams=" + definedParams
-                + ", taskAppId='" + taskAppId + '\''
-                + ", taskTimeoutStrategy=" + taskTimeoutStrategy
-                + ", taskTimeout=" + taskTimeout
-                + ", workerGroup='" + workerGroup + '\''
-                + ", environmentConfig='" + environmentConfig + '\''
-                + ", delayTime=" + delayTime
-                + ", resources=" + resources
-                + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
-                + ", k8sTaskExecutionContext=" + k8sTaskExecutionContext
-                + ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
-                + '}';
-    }
-
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index 82c8db12d4..411de30c34 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -78,6 +78,15 @@ public enum ExecutionStatus {
 
     private static HashMap<Integer, ExecutionStatus> EXECUTION_STATUS_MAP = new HashMap<>();
 
+    private static final int[] NEED_FAILOVER_STATES = new int[] {
+        ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+        ExecutionStatus.DISPATCH.ordinal(),
+        ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+        ExecutionStatus.DELAY_EXECUTION.ordinal(),
+        ExecutionStatus.READY_PAUSE.ordinal(),
+        ExecutionStatus.READY_STOP.ordinal()
+    };
+
     static {
         for (ExecutionStatus executionStatus : ExecutionStatus.values()) {
             EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus);
@@ -180,4 +189,18 @@ public enum ExecutionStatus {
         }
         throw new IllegalArgumentException("invalid status : " + status);
     }
+
+    public static boolean isNeedFailoverWorkflowInstanceState(ExecutionStatus executionStatus) {
+        return
+            ExecutionStatus.SUBMITTED_SUCCESS == executionStatus
+                || ExecutionStatus.DISPATCH == executionStatus
+                || ExecutionStatus.RUNNING_EXECUTION == executionStatus
+                || ExecutionStatus.DELAY_EXECUTION == executionStatus
+                || ExecutionStatus.READY_PAUSE == executionStatus
+                || ExecutionStatus.READY_STOP == executionStatus;
+    }
+
+    public static int[] getNeedFailoverWorkflowInstanceState() {
+        return NEED_FAILOVER_STATES;
+    }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
index 1e69d873ab..6c68cc00ec 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
@@ -22,22 +22,20 @@ import java.util.function.Supplier;
 import io.micrometer.core.instrument.Counter;
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.Metrics;
+import lombok.experimental.UtilityClass;
 
-public final class WorkerServerMetrics {
-
-    public WorkerServerMetrics() {
-        throw new UnsupportedOperationException("Utility class");
-    }
+@UtilityClass
+public class WorkerServerMetrics {
 
     private static final Counter WORKER_OVERLOAD_COUNTER =
-            Counter.builder("ds.worker.overload.count")
-                    .description("overloaded workers count")
-                    .register(Metrics.globalRegistry);
+        Counter.builder("ds.worker.overload.count")
+            .description("overloaded workers count")
+            .register(Metrics.globalRegistry);
 
     private static final Counter WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER =
-            Counter.builder("ds.worker.full.submit.queue.count")
-                    .description("full worker submit queues count")
-                    .register(Metrics.globalRegistry);
+        Counter.builder("ds.worker.full.submit.queue.count")
+            .description("full worker submit queues count")
+            .register(Metrics.globalRegistry);
 
     public static void incWorkerOverloadCount() {
         WORKER_OVERLOAD_COUNTER.increment();
@@ -49,8 +47,9 @@ public final class WorkerServerMetrics {
 
     public static void registerWorkerRunningTaskGauge(Supplier<Number> supplier) {
         Gauge.builder("ds.task.running", supplier)
-                .description("number of running tasks on workers")
-                .register(Metrics.globalRegistry);
+            .description("number of running tasks on workers")
+            .register(Metrics.globalRegistry);
 
     }
+
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 8264ea52f0..3f70974344 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
@@ -92,9 +93,6 @@ public class TaskCallbackService {
      * change remote channel
      */
     public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
-        if (REMOTE_CHANNELS.containsKey(taskInstanceId)) {
-            REMOTE_CHANNELS.remove(taskInstanceId);
-        }
         REMOTE_CHANNELS.put(taskInstanceId, channel);
     }
 
@@ -104,19 +102,19 @@ public class TaskCallbackService {
      * @param taskInstanceId taskInstanceId
      * @return callback channel
      */
-    private NettyRemoteChannel getRemoteChannel(int taskInstanceId) {
+    private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
         Channel newChannel;
         NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
         if (nettyRemoteChannel != null) {
             if (nettyRemoteChannel.isActive()) {
-                return nettyRemoteChannel;
+                return Optional.of(nettyRemoteChannel);
             }
             newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
             if (newChannel != null) {
-                return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
+                return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId));
             }
         }
-        return null;
+        return Optional.empty();
     }
 
     public long pause(int ntries) {
@@ -151,15 +149,13 @@ public class TaskCallbackService {
      * @param command command
      */
     public void send(int taskInstanceId, Command command) {
-        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
-        if (nettyRemoteChannel != null) {
-            nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
-
+        Optional<NettyRemoteChannel> nettyRemoteChannel = getRemoteChannel(taskInstanceId);
+        if (nettyRemoteChannel.isPresent()) {
+            nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() {
                 @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess()) {
-                        // remove(taskInstanceId);
-                        return;
+                public void operationComplete(ChannelFuture future) {
+                    if (!future.isSuccess()) {
+                        logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command);
                     }
                 }
             });
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 533f40fa41..c14a2e891e 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -111,8 +111,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         }
         logger.info("task execute request command : {}", taskRequestCommand);
 
-        String contextJson = taskRequestCommand.getTaskExecutionContext();
-        TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
+        TaskExecutionContext taskExecutionContext = taskRequestCommand.getTaskExecutionContext();
 
         if (taskExecutionContext == null) {
             logger.error("task execution context is null");
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index 03df8de451..f691f7c8de 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -34,7 +35,7 @@ import org.springframework.stereotype.Component;
  * Retry Report Task Status Thread
  */
 @Component
-public class RetryReportTaskStatusThread implements Runnable {
+public class RetryReportTaskStatusThread extends BaseDaemonThread {
 
     private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
 
@@ -46,11 +47,14 @@ public class RetryReportTaskStatusThread implements Runnable {
     @Autowired
     private TaskCallbackService taskCallbackService;
 
-    public void start() {
+    protected RetryReportTaskStatusThread() {
+        super("RetryReportTaskStatusThread");
+    }
+
+    @Override
+    public synchronized void start() {
         logger.info("Retry report task status thread starting");
-        Thread thread = new Thread(this, "RetryReportTaskStatusThread");
-        thread.setDaemon(true);
-        thread.start();
+        super.start();
         logger.info("Retry report task status thread started");
     }
 
@@ -59,7 +63,7 @@ public class RetryReportTaskStatusThread implements Runnable {
      */
     @Override
     public void run() {
-        ResponseCache instance = ResponseCache.get();
+        final ResponseCache instance = ResponseCache.get();
 
         while (Stopper.isRunning()) {
 
@@ -67,33 +71,57 @@ public class RetryReportTaskStatusThread implements Runnable {
             ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
 
             try {
-                if (!instance.getRunningCache().isEmpty()) {
-                    Map<Integer, Command> runningCache = instance.getRunningCache();
-                    for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
-                        Integer taskInstanceId = entry.getKey();
-                        Command runningCommand = entry.getValue();
-                        taskCallbackService.send(taskInstanceId, runningCommand);
-                    }
+                // todo: Only retry the send failed command
+                retryRunningCommand(instance);
+                retryResponseCommand(instance);
+                retryRecallCommand(instance);
+            } catch (Exception e) {
+                logger.warn("Retry report task status error", e);
+            }
+        }
+    }
+
+    private void retryRunningCommand(ResponseCache instance) {
+        if (!instance.getRunningCache().isEmpty()) {
+            Map<Integer, Command> runningCache = instance.getRunningCache();
+            for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
+                Integer taskInstanceId = entry.getKey();
+                Command runningCommand = entry.getValue();
+                try {
+                    taskCallbackService.send(taskInstanceId, runningCommand);
+                } catch (Exception ex) {
+                    logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
                 }
+            }
+        }
+    }
 
-                if (!instance.getResponseCache().isEmpty()) {
-                    Map<Integer, Command> responseCache = instance.getResponseCache();
-                    for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
-                        Integer taskInstanceId = entry.getKey();
-                        Command responseCommand = entry.getValue();
-                        taskCallbackService.send(taskInstanceId, responseCommand);
-                    }
+    private void retryResponseCommand(ResponseCache instance) {
+        if (!instance.getResponseCache().isEmpty()) {
+            Map<Integer, Command> responseCache = instance.getResponseCache();
+            for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
+                Integer taskInstanceId = entry.getKey();
+                Command responseCommand = entry.getValue();
+                try {
+                    taskCallbackService.send(taskInstanceId, responseCommand);
+                } catch (Exception ex) {
+                    logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
                 }
-                if (!instance.getRecallCache().isEmpty()) {
-                    Map<Integer, Command> recallCache = instance.getRecallCache();
-                    for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
-                        Integer taskInstanceId = entry.getKey();
-                        Command responseCommand = entry.getValue();
-                        taskCallbackService.send(taskInstanceId, responseCommand);
-                    }
+            }
+        }
+    }
+
+    private void retryRecallCommand(ResponseCache instance) {
+        if (!instance.getRecallCache().isEmpty()) {
+            Map<Integer, Command> recallCache = instance.getRecallCache();
+            for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
+                Integer taskInstanceId = entry.getKey();
+                Command responseCommand = entry.getValue();
+                try {
+                    taskCallbackService.send(taskInstanceId, responseCommand);
+                } catch (Exception ex) {
+                    logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
                 }
-            } catch (Exception e) {
-                logger.warn("Retry report task status error", e);
             }
         }
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 9349056507..7d49005395 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -132,9 +132,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
             taskExecutionContext.setEndTime(new Date());
             TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
             taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+            logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
+                taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
             return;
         }
-
         try {
             LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
             logger.info("script path : {}", taskExecutionContext.getExecutePath());
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index 13c533f1cf..2187a19281 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -88,7 +88,7 @@ public class TaskExecuteProcessorTest {
         command = new Command();
         command.setType(CommandType.TASK_EXECUTE_REQUEST);
         ackCommand = new TaskExecuteRunningCommand().convert2Command();
-        taskRequestCommand = new TaskExecuteRequestCommand();
+        taskRequestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
         alertClientService = PowerMockito.mock(AlertClientService.class);
         workerExecService = PowerMockito.mock(ExecutorService.class);
         PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
@@ -127,8 +127,6 @@ public class TaskExecuteProcessorTest {
         PowerMockito.mockStatic(JSONUtils.class);
         PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
                 .thenReturn(taskRequestCommand);
-        PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
-                .thenReturn(taskExecutionContext);
 
         PowerMockito.mockStatic(FileUtils.class);
         PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),