You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:48 UTC

[dolphinscheduler] 21/29: [Fix-10842] Fix master/worker failover will cause status incorrect (#10839)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 71edaf41a234f5c73e0c6266b6b8b927b146cd89
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Jul 9 11:54:59 2022 +0800

    [Fix-10842] Fix master/worker failover will cause status incorrect (#10839)
    
    * Fix master failover will not update task instance status
    * Add some failover log
    * Fix worker failover will rerun task more than once
    * Fix workflowInstance failover may rerun already success taskInstance
    
    (cherry picked from commit 3f69ec8f28c7206c153f5de8c7c0d1fa33d311f9)
---
 .../apache/dolphinscheduler/common/graph/DAG.java  |  11 +
 .../master/consumer/TaskPriorityQueueConsumer.java |  32 +-
 .../master/dispatch/context/ExecutionContext.java  |  45 +--
 .../dispatch/executor/NettyExecutorManager.java    |   3 +
 .../processor/queue/TaskExecuteRunnable.java       | 124 +++----
 .../registry/MasterRegistryDataListener.java       |   1 +
 .../master/runner/FailoverExecuteThread.java       |   5 +-
 .../master/runner/StateWheelExecuteThread.java     |  12 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 115 ++++---
 .../master/runner/task/CommonTaskProcessor.java    |   5 +-
 .../server/master/service/FailoverService.java     | 371 +--------------------
 .../master/service/MasterFailoverService.java      | 253 ++++++++++++++
 .../master/service/WorkerFailoverService.java      | 266 +++++++++++++++
 .../master/dispatch/ExecutionContextTestUtils.java |   2 +-
 .../executor/NettyExecutorManagerTest.java         |   4 +-
 .../server/master/service/FailoverServiceTest.java |  22 +-
 .../service/process/ProcessService.java            |   1 +
 .../service/process/ProcessServiceImpl.java        |  38 ++-
 .../service/queue/TaskPriority.java                |  41 ++-
 19 files changed, 823 insertions(+), 528 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
index e57b3dd93e..afa780163a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
@@ -498,5 +498,16 @@ public class DAG<Node, NodeInfo, EdgeInfo> {
         return new AbstractMap.SimpleEntry<>(notZeroIndegreeNodeMap.size() == 0, topoResultList);
     }
 
+    @Override
+    public String toString() {
+        return "DAG{"
+            + "nodesMap="
+            + nodesMap
+            + ", edgesMap="
+            + edgesMap
+            + ", reverseEdgesMap="
+            + reverseEdgesMap
+            + '}';
+    }
 }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index f04ae15dd8..82198e942a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -34,11 +34,11 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
 import org.apache.commons.collections.CollectionUtils;
 
@@ -46,6 +46,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -187,8 +188,24 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
         TaskMetrics.incTaskDispatch();
         boolean result = false;
         try {
+            WorkflowExecuteRunnable workflowExecuteRunnable =
+                processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
+            if (workflowExecuteRunnable == null) {
+                logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
+                return true;
+            }
+            Optional<TaskInstance> taskInstanceOptional =
+                workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
+            if (!taskInstanceOptional.isPresent()) {
+                logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
+                    taskPriority);
+                // we return true, so that we will drop this task.
+                return true;
+            }
+            TaskInstance taskInstance = taskInstanceOptional.get();
             TaskExecutionContext context = taskPriority.getTaskExecutionContext();
-            ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup());
+            ExecutionContext executionContext =
+                new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);
 
             if (isTaskNeedToCheck(taskPriority)) {
                 if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
@@ -196,16 +213,21 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
                     return true;
                 }
             }
+
             result = dispatcher.dispatch(executionContext);
 
             if (result) {
-                logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
+                logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
+                    taskPriority.getTaskId(),
+                    executionContext.getHost());
                 addDispatchEvent(context, executionContext);
             } else {
-                logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
+                logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
+                    taskPriority.getTaskId(),
+                    executionContext.getHost());
             }
         } catch (RuntimeException | ExecuteException e) {
-            logger.error("Master dispatch task to worker error: ", e);
+            logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
         }
         return result;
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index b3fba87870..880640e91d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -19,13 +19,17 @@ package org.apache.dolphinscheduler.server.master.dispatch.context;
 
 import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 
+import lombok.Data;
+
 /**
  *  execution context
  */
+@Data
 public class ExecutionContext {
 
     /**
@@ -34,51 +38,30 @@ public class ExecutionContext {
     private Host host;
 
     /**
-     *  command
+     * command
      */
     private final Command command;
 
+    private final TaskInstance taskInstance;
+
     /**
-     *  executor type : worker or client
+     * executor type : worker or client
      */
     private final ExecutorType executorType;
 
     /**
-     *  worker group
+     * worker group
      */
-    private String workerGroup;
+    private final String workerGroup;
 
-    public ExecutionContext(Command command, ExecutorType executorType) {
-        this(command, executorType, DEFAULT_WORKER_GROUP);
+    public ExecutionContext(Command command, ExecutorType executorType, TaskInstance taskInstance) {
+        this(command, executorType, DEFAULT_WORKER_GROUP, taskInstance);
     }
 
-    public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) {
+    public ExecutionContext(Command command, ExecutorType executorType, String workerGroup, TaskInstance taskInstance) {
         this.command = command;
         this.executorType = executorType;
         this.workerGroup = workerGroup;
-    }
-
-    public Command getCommand() {
-        return command;
-    }
-
-    public ExecutorType getExecutorType() {
-        return executorType;
-    }
-
-    public void setWorkerGroup(String workerGroup) {
-        this.workerGroup = workerGroup;
-    }
-
-    public String getWorkerGroup() {
-        return this.workerGroup;
-    }
-
-    public Host getHost() {
-        return host;
-    }
-
-    public void setHost(Host host) {
-        this.host = host;
+        this.taskInstance = taskInstance;
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 0ba24e287d..0cc2d4b8f7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -123,6 +123,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
                 doExecute(host, command);
                 success = true;
                 context.setHost(host);
+                // We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host
+                // is not belongs to the down worker ISSUE-10842.
+                context.getTaskInstance().setHost(host.getAddress());
             } catch (ExecuteException ex) {
                 logger.error(String.format("execute command : %s error", command), ex);
                 try {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index f488fc3de1..d57adcbbbc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -26,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
@@ -75,7 +75,7 @@ public class TaskExecuteRunnable implements Runnable {
                 LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
                 persist(event);
             } catch (Exception e) {
-                logger.error("persist error, event:{}, error: {}", event, e);
+                logger.error("persist task event error, event:{}", event, e);
             } finally {
                 this.events.remove(event);
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@@ -113,27 +113,29 @@ public class TaskExecuteRunnable implements Runnable {
      *
      * @param taskEvent taskEvent
      */
-    private void persist(TaskEvent taskEvent) {
+    private void persist(TaskEvent taskEvent) throws Exception {
         Event event = taskEvent.getEvent();
         int taskInstanceId = taskEvent.getTaskInstanceId();
         int processInstanceId = taskEvent.getProcessInstanceId();
 
         Optional<TaskInstance> taskInstance;
-        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        WorkflowExecuteRunnable workflowExecuteRunnable =
+            this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
         if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
             taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
         } else {
             taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
         }
 
+        boolean needToSendEvent = true;
         switch (event) {
             case DISPATCH:
-                handleDispatchEvent(taskEvent, taskInstance);
+                needToSendEvent = handleDispatchEvent(taskEvent, taskInstance);
                 // dispatch event do not need to submit state event
-                return;
+                break;
             case DELAY:
             case RUNNING:
-                handleRunningEvent(taskEvent, taskInstance);
+                needToSendEvent = handleRunningEvent(taskEvent, taskInstance);
                 break;
             case RESULT:
                 handleResultEvent(taskEvent, taskInstance);
@@ -141,6 +143,10 @@ public class TaskExecuteRunnable implements Runnable {
             default:
                 throw new IllegalArgumentException("invalid event type : " + event);
         }
+        if (!needToSendEvent) {
+            logger.info("Handle task event: {} success, there is no need to send a StateEvent", taskEvent);
+            return;
+        }
 
         StateEvent stateEvent = new StateEvent();
         stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
@@ -153,81 +159,83 @@ public class TaskExecuteRunnable implements Runnable {
     /**
      * handle dispatch event
      */
-    private void handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
+    private boolean handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
         if (!taskInstanceOptional.isPresent()) {
             logger.error("taskInstance is null");
-            return;
+            return false;
         }
         TaskInstance taskInstance = taskInstanceOptional.get();
         if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
-            return;
+            return false;
         }
         taskInstance.setState(ExecutionStatus.DISPATCH);
         taskInstance.setHost(taskEvent.getWorkerAddress());
         processService.saveTaskInstance(taskInstance);
+        return true;
     }
 
     /**
      * handle running event
      */
-    private void handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
+    private boolean handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
         Channel channel = taskEvent.getChannel();
-        try {
-            if (taskInstanceOptional.isPresent()) {
-                TaskInstance taskInstance = taskInstanceOptional.get();
-                if (taskInstance.getState().typeIsFinished()) {
-                    logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
-                } else {
-                    taskInstance.setState(taskEvent.getState());
-                    taskInstance.setStartTime(taskEvent.getStartTime());
-                    taskInstance.setHost(taskEvent.getWorkerAddress());
-                    taskInstance.setLogPath(taskEvent.getLogPath());
-                    taskInstance.setExecutePath(taskEvent.getExecutePath());
-                    taskInstance.setPid(taskEvent.getProcessId());
-                    taskInstance.setAppLink(taskEvent.getAppIds());
-                    processService.saveTaskInstance(taskInstance);
-                }
+        if (taskInstanceOptional.isPresent()) {
+            TaskInstance taskInstance = taskInstanceOptional.get();
+            if (taskInstance.getState().typeIsFinished()) {
+                logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}",
+                    taskInstance.getId(),
+                    taskInstance.getState());
+                return false;
+            } else {
+                taskInstance.setState(taskEvent.getState());
+                taskInstance.setStartTime(taskEvent.getStartTime());
+                taskInstance.setHost(taskEvent.getWorkerAddress());
+                taskInstance.setLogPath(taskEvent.getLogPath());
+                taskInstance.setExecutePath(taskEvent.getExecutePath());
+                taskInstance.setPid(taskEvent.getProcessId());
+                taskInstance.setAppLink(taskEvent.getAppIds());
+                processService.saveTaskInstance(taskInstance);
             }
-            // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
-            // send ack to worker
-            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
-        } catch (Exception e) {
-            logger.error("worker ack master error", e);
-            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
         }
+        // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
+        // send ack to worker
+        TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
+            new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+        channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+        return true;
     }
 
     /**
      * handle result event
      */
-    private void handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
+    private boolean handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
         Channel channel = taskEvent.getChannel();
-        try {
-            if (taskInstanceOptional.isPresent()) {
-                TaskInstance taskInstance = taskInstanceOptional.get();
-                dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
-
-                taskInstance.setStartTime(taskEvent.getStartTime());
-                taskInstance.setHost(taskEvent.getWorkerAddress());
-                taskInstance.setLogPath(taskEvent.getLogPath());
-                taskInstance.setExecutePath(taskEvent.getExecutePath());
-                taskInstance.setPid(taskEvent.getProcessId());
-                taskInstance.setAppLink(taskEvent.getAppIds());
-                taskInstance.setState(taskEvent.getState());
-                taskInstance.setEndTime(taskEvent.getEndTime());
-                taskInstance.setVarPool(taskEvent.getVarPool());
-                processService.changeOutParam(taskInstance);
-                processService.saveTaskInstance(taskInstance);
+        if (taskInstanceOptional.isPresent()) {
+            TaskInstance taskInstance = taskInstanceOptional.get();
+            if (taskInstance.getState().typeIsFinished()) {
+                logger.warn("The current taskInstance has already been finished, taskEvent: {}", taskEvent);
+                return false;
             }
-            // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
-            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
-        } catch (Exception e) {
-            logger.error("worker response master error", e);
-            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+
+            dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
+
+            taskInstance.setStartTime(taskEvent.getStartTime());
+            taskInstance.setHost(taskEvent.getWorkerAddress());
+            taskInstance.setLogPath(taskEvent.getLogPath());
+            taskInstance.setExecutePath(taskEvent.getExecutePath());
+            taskInstance.setPid(taskEvent.getProcessId());
+            taskInstance.setAppLink(taskEvent.getAppIds());
+            taskInstance.setState(taskEvent.getState());
+            taskInstance.setEndTime(taskEvent.getEndTime());
+            taskInstance.setVarPool(taskEvent.getVarPool());
+            processService.changeOutParam(taskInstance);
+            processService.saveTaskInstance(taskInstance);
         }
+        // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
+        TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
+            new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+        channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+        return true;
     }
+
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index 30cea36db7..1eafb4cb54 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -64,6 +64,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
                 break;
             case REMOVE:
                 masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true);
+
                 break;
             default:
                 break;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 63f4215f27..16656c8760 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.service.FailoverService;
+import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +42,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
      * failover service
      */
     @Autowired
-    private FailoverService failoverService;
+    private MasterFailoverService masterFailoverService;
 
     protected FailoverExecuteThread() {
         super("FailoverExecuteThread");
@@ -63,7 +64,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
             try {
                 // todo: DO we need to schedule a task to do this kind of check
                 // This kind of check may only need to be executed when a master server start
-                failoverService.checkMasterFailover();
+                masterFailoverService.checkMasterFailover();
             } catch (Exception e) {
                 logger.error("Master failover thread execute error", e);
             } finally {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index c70fb1f1d4..843c1b246c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
@@ -31,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 
 import java.util.Optional;
@@ -296,15 +296,21 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
                 }
 
                 if (!taskInstanceOptional.isPresent()) {
-                    logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
+                    logger.warn(
+                        "Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
                     taskInstanceRetryCheckList.remove(taskInstanceKey);
                     continue;
                 }
 
                 TaskInstance taskInstance = taskInstanceOptional.get();
-                if (taskInstance.retryTaskIntervalOverTime()) {
+                // We check the status to avoid when we do worker failover we submit a failover task, this task may be resubmit by this
+                // thread
+                if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE
+                    && taskInstance.retryTaskIntervalOverTime()) {
                     // reset taskInstance endTime and state
                     // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
+                    logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance",
+                        taskInstance.getId());
                     taskInstance.setEndTime(null);
                     taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 16604662ac..2763d99932 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -107,6 +107,7 @@ import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
 
 import com.google.common.collect.Lists;
 
@@ -140,7 +141,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
     /**
      * process instance
      */
-    private ProcessInstance processInstance;
+    private final ProcessInstance processInstance;
 
     /**
      * process definition
@@ -289,6 +290,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 StateEventHandler stateEventHandler =
                     StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
                         .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
+                logger.info("Begin to handle state event, {}", stateEvent);
                 if (stateEventHandler.handleStateEvent(this, stateEvent)) {
                     this.stateEvents.remove(stateEvent);
                 }
@@ -482,9 +484,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      */
     public void refreshProcessInstance(int processInstanceId) {
         logger.info("process instance update: {}", processInstanceId);
-        processInstance = processService.findProcessInstanceById(processInstanceId);
+        ProcessInstance newProcessInstance = processService.findProcessInstanceById(processInstanceId);
+        // just update the processInstance field(this is soft copy)
+        BeanUtils.copyProperties(newProcessInstance, processInstance);
+
         processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-                                                                 processInstance.getProcessDefinitionVersion());
+            processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
     }
 
@@ -773,6 +778,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         }
         // generate process dag
         dag = DagHelper.buildDagGraph(processDag);
+        logger.info("Build dag success, dag: {}", dag);
     }
 
     /**
@@ -787,45 +793,60 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         errorTaskMap.clear();
 
         if (!isNewProcessInstance()) {
+            logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}",
+                processInstance.getRunTimes(),
+                processInstance.getRecovery());
             List<TaskInstance> validTaskInstanceList =
                 processService.findValidTaskListByProcessId(processInstance.getId());
             for (TaskInstance task : validTaskInstanceList) {
-                if (validTaskMap.containsKey(task.getTaskCode())) {
-                    int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
-                    TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
-                    if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
-                        task.setFlag(Flag.NO);
-                        processService.updateTaskInstance(task);
-                        continue;
+                try {
+                    LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
+                    logger.info(
+                        "Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}",
+                        task.getTaskCode(),
+                        task.getState());
+                    if (validTaskMap.containsKey(task.getTaskCode())) {
+                        int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
+                        TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
+                        if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
+                            task.setFlag(Flag.NO);
+                            processService.updateTaskInstance(task);
+                            continue;
+                        }
+                        logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
+                            task.getTaskCode());
                     }
-                    logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
-                        task.getTaskCode());
-                }
 
-                validTaskMap.put(task.getTaskCode(), task.getId());
-                taskInstanceMap.put(task.getId(), task);
+                    validTaskMap.put(task.getTaskCode(), task.getId());
+                    taskInstanceMap.put(task.getId(), task);
 
-                if (task.isTaskComplete()) {
-                    completeTaskMap.put(task.getTaskCode(), task.getId());
-                    continue;
-                }
-                if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
-                    continue;
-                }
-                if (task.taskCanRetry()) {
-                    if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
-                        // tolerantTaskInstance add to standby list directly
-                        TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
-                        addTaskToStandByList(tolerantTaskInstance);
-                    } else {
-                        retryTaskInstance(task);
+                    if (task.isTaskComplete()) {
+                        completeTaskMap.put(task.getTaskCode(), task.getId());
+                        continue;
                     }
-                    continue;
-                }
-                if (task.getState().typeIsFailure()) {
-                    errorTaskMap.put(task.getTaskCode(), task.getId());
+                    if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
+                        dag)) {
+                        continue;
+                    }
+                    if (task.taskCanRetry()) {
+                        if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+                            // tolerantTaskInstance add to standby list directly
+                            TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
+                            addTaskToStandByList(tolerantTaskInstance);
+                        } else {
+                            retryTaskInstance(task);
+                        }
+                        continue;
+                    }
+                    if (task.getState().typeIsFailure()) {
+                        errorTaskMap.put(task.getTaskCode(), task.getId());
+                    }
+                } finally {
+                    LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                 }
             }
+        } else {
+            logger.info("The current workflowInstance is a newly running workflowInstance");
         }
 
         if (processInstance.isComplementData() && complementListDate.isEmpty()) {
@@ -858,6 +879,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 }
             }
         }
+        logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
+            dependFailedTaskMap,
+            completeTaskMap,
+            errorTaskMap);
     }
 
     /**
@@ -902,6 +927,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
             taskInstanceMap.put(taskInstance.getId(), taskInstance);
             activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
+            boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
+            if (!dispatchSuccess) {
+                logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!",
+                    processInstance.getId(),
+                    processInstance.getName(),
+                    taskInstance.getId(),
+                    taskInstance.getName());
+                return Optional.empty();
+            }
             taskProcessor.action(TaskAction.RUN);
 
             stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
@@ -1835,14 +1869,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      * is new process instance
      */
     private boolean isNewProcessInstance() {
+        if (Flag.YES.equals(processInstance.getRecovery())) {
+            logger.info("This workInstance will be recover by this execution");
+            return false;
+        }
+
         if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() && processInstance.getRunTimes() == 1) {
             return true;
-        } else if (processInstance.getRecovery().equals(Flag.YES)) {
-            // host is empty use old task instance
-            return false;
-        } else {
-            return false;
         }
+        logger.info(
+            "The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}",
+            processInstance.getState(),
+            processInstance.getRunTimes());
+        return false;
     }
 
     public Map<Long, Integer> getCompleteTaskMap() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index a19d08affc..0dec26defe 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -67,7 +67,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
                 return true;
             }
         }
-        dispatchTask();
         return true;
     }
 
@@ -110,7 +109,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
                 logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
                 return true;
             }
-            logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId());
+            logger.info("task ready to dispatch to worker: taskInstanceId: {}", taskInstance.getId());
 
             TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
                     processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
@@ -158,7 +157,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
             TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
             killCommand.setTaskInstanceId(taskInstance.getId());
 
-            ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
+            ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
 
             Host host = Host.of(taskInstance.getHost());
             executionContext.setHost(host);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 313a43c0e0..05a54e8529 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,46 +17,12 @@
 
 package org.apache.dolphinscheduler.server.master.service;
 
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
-import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
-import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.StopWatch;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
 import lombok.NonNull;
 
 /**
@@ -65,41 +31,14 @@ import lombok.NonNull;
 @Component
 public class FailoverService {
     private static final Logger LOGGER = LoggerFactory.getLogger(FailoverService.class);
-    private final RegistryClient registryClient;
-    private final MasterConfig masterConfig;
-    private final ProcessService processService;
-    private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
-    private final ProcessInstanceExecCacheManager cacheManager;
-    private final String localAddress;
-
-    public FailoverService(@NonNull RegistryClient registryClient,
-                           @NonNull MasterConfig masterConfig,
-                           @NonNull ProcessService processService,
-                           @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
-                           @NonNull ProcessInstanceExecCacheManager cacheManager) {
-        this.registryClient = registryClient;
-        this.masterConfig = masterConfig;
-        this.processService = processService;
-        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
-        this.cacheManager = cacheManager;
-        this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
-    }
 
-    /**
-     * check master failover
-     */
-    @Counted(value = "failover_scheduler_check_task_count")
-    @Timed(value = "failover_scheduler_check_task_time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
-    public void checkMasterFailover() {
-        List<String> hosts = getNeedFailoverMasterServers();
-        if (CollectionUtils.isEmpty(hosts)) {
-            return;
-        }
-        LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts);
+    private final MasterFailoverService masterFailoverService;
+    private final WorkerFailoverService workerFailoverService;
 
-        for (String host : hosts) {
-            failoverMasterWithLock(host);
-        }
+    public FailoverService(@NonNull MasterFailoverService masterFailoverService,
+                           @NonNull WorkerFailoverService workerFailoverService) {
+        this.masterFailoverService = masterFailoverService;
+        this.workerFailoverService = workerFailoverService;
     }
 
     /**
@@ -111,304 +50,18 @@ public class FailoverService {
     public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
         switch (nodeType) {
             case MASTER:
-                failoverMasterWithLock(serverHost);
+                LOGGER.info("Master failover starting, masterServer: {}", serverHost);
+                masterFailoverService.failoverMaster(serverHost);
+                LOGGER.info("Master failover finished, masterServer: {}", serverHost);
                 break;
             case WORKER:
-                failoverWorker(serverHost);
+                LOGGER.info("Worker failover staring, workerServer: {}", serverHost);
+                workerFailoverService.failoverWorker(serverHost);
+                LOGGER.info("Worker failover finished, workerServer: {}", serverHost);
                 break;
             default:
                 break;
         }
     }
 
-    private void failoverMasterWithLock(String masterHost) {
-        String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
-        try {
-            registryClient.getLock(failoverPath);
-            this.failoverMaster(masterHost);
-        } catch (Exception e) {
-            LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
-        } finally {
-            registryClient.releaseLock(failoverPath);
-        }
-    }
-
-    /**
-     * Failover master, will failover process instance and associated task instance.
-     * <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
-     * then the process instance will be failovered.
-     *
-     * @param masterHost master host
-     */
-    private void failoverMaster(String masterHost) {
-        if (StringUtils.isEmpty(masterHost)) {
-            return;
-        }
-        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
-        StopWatch failoverTimeCost = StopWatch.createStarted();
-        List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
-        LOGGER.info("start master[{}] failover, need to failover process list size:{}", masterHost, needFailoverProcessInstanceList.size());
-
-        // servers need to contain master hosts and worker hosts, otherwise the logic task will failover fail.
-        List<Server> servers = registryClient.getServerList(NodeType.WORKER);
-        servers.addAll(registryClient.getServerList(NodeType.MASTER));
-
-        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
-            if (Constants.NULL.equals(processInstance.getHost())) {
-                continue;
-            }
-
-            List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
-            for (TaskInstance taskInstance : validTaskInstanceList) {
-                LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-                failoverTaskInstance(processInstance, taskInstance, servers);
-            }
-
-            if (serverStartupTime != null && processInstance.getRestartTime() != null
-                    && processInstance.getRestartTime().after(serverStartupTime)) {
-                continue;
-            }
-
-            LOGGER.info("failover process instance id: {}", processInstance.getId());
-            ProcessInstanceMetrics.incProcessInstanceFailover();
-            //updateProcessInstance host is null and insert into command
-            processInstance.setHost(Constants.NULL);
-            processService.processNeedFailoverProcessInstances(processInstance);
-        }
-
-        failoverTimeCost.stop();
-        LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
-    }
-
-    /**
-     * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
-     * and failover these tasks.
-     * <p>
-     * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
-     *
-     * @param workerHost worker host
-     */
-    private void failoverWorker(String workerHost) {
-        if (StringUtils.isEmpty(workerHost)) {
-            return;
-        }
-
-        long startTime = System.currentTimeMillis();
-        // we query the task instance from cache, so that we can directly update the cache
-        final List<TaskInstance> needFailoverTaskInstanceList = cacheManager.getAll()
-            .stream()
-            .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
-            .filter(taskInstance ->
-                workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
-            .collect(Collectors.toList());
-        final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
-        LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
-        final List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
-        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
-            try {
-                ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
-                if (processInstance == null) {
-                    processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance();
-                    if (processInstance == null) {
-                        LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
-                            taskInstance.getProcessInstanceId(), taskInstance.getId());
-                        continue;
-                    }
-                    processInstanceCacheMap.put(processInstance.getId(), processInstance);
-                }
-
-                // only failover the task owned myself if worker down.
-                if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
-                    continue;
-                }
-
-                LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-                failoverTaskInstance(processInstance, taskInstance, workerServers);
-            } finally {
-                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
-            }
-        }
-        LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
-    }
-
-    /**
-     * failover task instance
-     * <p>
-     * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
-     * 2. change task state from running to need failover.
-     * 3. try to notify local master
-     *
-     * @param processInstance
-     * @param taskInstance
-     * @param servers         if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
-     */
-    private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
-        if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
-            LOGGER.info("The taskInstance doesn't need to failover");
-            return;
-        }
-        TaskMetrics.incTaskFailover();
-        boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
-
-        taskInstance.setProcessInstance(processInstance);
-
-        if (!isMasterTask) {
-            LOGGER.info("The failover taskInstance is not master task");
-            TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                .buildProcessInstanceRelatedInfo(processInstance)
-                .create();
-
-            if (masterConfig.isKillYarnJobWhenTaskFailover()) {
-                // only kill yarn job if exists , the local thread has exited
-                ProcessUtils.killYarnJob(taskExecutionContext);
-            }
-        } else {
-            LOGGER.info("The failover taskInstance is a master task");
-        }
-
-        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
-        processService.saveTaskInstance(taskInstance);
-
-        StateEvent stateEvent = new StateEvent();
-        stateEvent.setTaskInstanceId(taskInstance.getId());
-        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-        stateEvent.setProcessInstanceId(processInstance.getId());
-        stateEvent.setExecutionStatus(taskInstance.getState());
-        workflowExecuteThreadPool.submitStateEvent(stateEvent);
-    }
-
-    /**
-     * Get need failover master servers.
-     * <p>
-     * Query the process instances from database, if the processInstance's host doesn't exist in registry
-     * or the host is the currentServer, then it will need to failover.
-     *
-     * @return need failover master servers
-     */
-    private List<String> getNeedFailoverMasterServers() {
-        // failover myself && failover dead masters
-        List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
-
-        Iterator<String> iterator = hosts.iterator();
-        while (iterator.hasNext()) {
-            String host = iterator.next();
-            if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
-                if (!localAddress.equals(host)) {
-                    iterator.remove();
-                }
-            }
-        }
-        return hosts;
-    }
-
-    /**
-     * task needs failover if task start before server starts
-     *
-     * @param servers servers, can container master servers or worker servers
-     * @param taskInstance  task instance
-     * @return true if task instance need fail over
-     */
-    private boolean checkTaskInstanceNeedFailover(List<Server> servers, TaskInstance taskInstance) {
-
-        boolean taskNeedFailover = true;
-
-        if (taskInstance == null) {
-            LOGGER.error("Master failover task instance error, taskInstance is null");
-            return false;
-        }
-
-        if (Constants.NULL.equals(taskInstance.getHost())) {
-            return false;
-        }
-
-        if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
-            return false;
-        }
-
-        //now no host will execute this task instance,so no need to failover the task
-        if (taskInstance.getHost() == null) {
-            return false;
-        }
-
-        //if task start after server starts, there is no need to failover the task.
-        if (checkTaskAfterServerStart(servers, taskInstance)) {
-            taskNeedFailover = false;
-        }
-
-        return taskNeedFailover;
-    }
-
-    /**
-     * check task start after the worker server starts.
-     *
-     * @param servers servers, can contain master servers or worker servers
-     * @param taskInstance task instance
-     * @return true if task instance start time after server start date
-     */
-    private boolean checkTaskAfterServerStart(List<Server> servers, TaskInstance taskInstance) {
-        if (StringUtils.isEmpty(taskInstance.getHost())) {
-            return false;
-        }
-        Date serverStartDate = getServerStartupTime(servers, taskInstance.getHost());
-        if (serverStartDate != null) {
-            if (taskInstance.getStartTime() == null) {
-                return taskInstance.getSubmitTime().after(serverStartDate);
-            } else {
-                return taskInstance.getStartTime().after(serverStartDate);
-            }
-        }
-        return false;
-    }
-
-    /**
-     * get failover lock path
-     *
-     * @param nodeType zookeeper node type
-     * @return fail over lock path
-     */
-    private String getFailoverLockPath(NodeType nodeType, String host) {
-        switch (nodeType) {
-            case MASTER:
-                return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
-            case WORKER:
-                return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
-            default:
-                return "";
-        }
-    }
-
-    /**
-     * get server startup time
-     */
-    private Date getServerStartupTime(NodeType nodeType, String host) {
-        if (StringUtils.isEmpty(host)) {
-            return null;
-        }
-        List<Server> servers = registryClient.getServerList(nodeType);
-        return getServerStartupTime(servers, host);
-    }
-
-    /**
-     * get server startup time
-     */
-    private Date getServerStartupTime(List<Server> servers, String host) {
-        if (CollectionUtils.isEmpty(servers)) {
-            return null;
-        }
-        Date serverStartupTime = null;
-        for (Server server : servers) {
-            if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
-                serverStartupTime = server.getCreateTime();
-                break;
-            }
-        }
-        return serverStartupTime;
-    }
-
-    public String getLocalAddress() {
-        return localAddress;
-    }
-
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
new file mode 100644
index 0000000000..c7e5b4ea13
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+import lombok.NonNull;
+
+@Service
+public class MasterFailoverService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MasterFailoverService.class);
+    private final RegistryClient registryClient;
+    private final MasterConfig masterConfig;
+    private final ProcessService processService;
+    private final String localAddress;
+
+    public MasterFailoverService(@NonNull RegistryClient registryClient,
+                                 @NonNull MasterConfig masterConfig,
+                                 @NonNull ProcessService processService) {
+        this.registryClient = registryClient;
+        this.masterConfig = masterConfig;
+        this.processService = processService;
+        this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+
+    }
+
+    /**
+     * check master failover
+     */
+    @Counted(value = "ds.master.scheduler.failover.check.count")
+    @Timed(value = "ds.master.scheduler.failover.check.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
+    public void checkMasterFailover() {
+        List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost()
+            .stream()
+            // failover myself || dead server
+            .filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER))
+            .distinct()
+            .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
+            return;
+        }
+        LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, needFailoverMasterHosts);
+
+        for (String needFailoverMasterHost : needFailoverMasterHosts) {
+            failoverMaster(needFailoverMasterHost);
+        }
+    }
+
+    public void failoverMaster(String masterHost) {
+        String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost;
+        try {
+            registryClient.getLock(failoverPath);
+            doFailoverMaster(masterHost);
+        } catch (Exception e) {
+            LOGGER.error("Master server failover failed, host:{}", masterHost, e);
+        } finally {
+            registryClient.releaseLock(failoverPath);
+        }
+    }
+
+    /**
+     * Failover master, will failover process instance and associated task instance.
+     * <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
+     * then the process instance will be failovered.
+     *
+     * @param masterHost master host
+     */
+    private void doFailoverMaster(@NonNull String masterHost) {
+        LOGGER.info("Master[{}] failover starting, need to failover process", masterHost);
+        StopWatch failoverTimeCost = StopWatch.createStarted();
+
+        Optional<Date> masterStartupTimeOptional =
+            getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost);
+        List<ProcessInstance> needFailoverProcessInstanceList =
+            processService.queryNeedFailoverProcessInstances(masterHost);
+
+        LOGGER.info(
+            "Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
+            masterHost,
+            needFailoverProcessInstanceList.size(),
+            needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
+
+        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+                LOGGER.info("WorkflowInstance failover starting");
+                if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
+                    LOGGER.info("WorkflowInstance doesn't need to failover");
+                    continue;
+                }
+                int processInstanceId = processInstance.getId();
+                List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
+                for (TaskInstance taskInstance : taskInstanceList) {
+                    try {
+                        LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
+                        LOGGER.info("TaskInstance failover starting");
+                        if (!checkTaskInstanceNeedFailover(taskInstance)) {
+                            LOGGER.info("The taskInstance doesn't need to failover");
+                            continue;
+                        }
+                        failoverTaskInstance(processInstance, taskInstance);
+                        LOGGER.info("TaskInstance failover finished");
+                    } finally {
+                        LoggerUtils.removeTaskInstanceIdMDC();
+                    }
+                }
+
+                ProcessInstanceMetrics.incProcessInstanceFailover();
+                //updateProcessInstance host is null to mark this processInstance has been failover
+                // and insert a failover command
+                processInstance.setHost(Constants.NULL);
+                processService.processNeedFailoverProcessInstances(processInstance);
+                LOGGER.info("WorkflowInstance failover finished");
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
+        }
+
+        failoverTimeCost.stop();
+        LOGGER.info("Master[{}] failover finished, useTime:{}ms",
+            masterHost,
+            failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
+    }
+
+    private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
+        if (CollectionUtils.isEmpty(servers)) {
+            return Optional.empty();
+        }
+        Date serverStartupTime = null;
+        for (Server server : servers) {
+            if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
+                serverStartupTime = server.getCreateTime();
+                break;
+            }
+        }
+        return Optional.ofNullable(serverStartupTime);
+    }
+
+    /**
+     * failover task instance
+     * <p>
+     * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
+     * 2. change task state from running to need failover.
+     * 3. try to notify local master
+     *
+     * @param processInstance
+     * @param taskInstance
+     */
+    private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
+        TaskMetrics.incTaskFailover();
+        boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
+
+        taskInstance.setProcessInstance(processInstance);
+
+        if (!isMasterTask) {
+            LOGGER.info("The failover taskInstance is not master task");
+            TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
+                .buildTaskInstanceRelatedInfo(taskInstance)
+                .buildProcessInstanceRelatedInfo(processInstance)
+                .create();
+
+            if (masterConfig.isKillYarnJobWhenTaskFailover()) {
+                // only kill yarn job if exists , the local thread has exited
+                LOGGER.info("TaskInstance failover begin kill the task related yarn job");
+                ProcessUtils.killYarnJob(taskExecutionContext);
+            }
+        } else {
+            LOGGER.info("The failover taskInstance is a master task");
+        }
+
+        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+        taskInstance.setFlag(Flag.NO);
+        processService.saveTaskInstance(taskInstance);
+    }
+
+    private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) {
+        if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
+            // The task is already finished, so we don't need to failover this task instance
+            return false;
+        }
+        return true;
+    }
+
+    private boolean checkProcessInstanceNeedFailover(Optional<Date> beFailoveredMasterStartupTimeOptional,
+                                                     @NonNull ProcessInstance processInstance) {
+        // The process has already been failover, since when we do master failover we will hold a lock, so we can guarantee
+        // the host will not be set concurrent.
+        if (Constants.NULL.equals(processInstance.getHost())) {
+            return false;
+        }
+        if (!beFailoveredMasterStartupTimeOptional.isPresent()) {
+            // the master is not active, we can failover all it's processInstance
+            return true;
+        }
+        Date beFailoveredMasterStartupTime = beFailoveredMasterStartupTimeOptional.get();
+
+        if (processInstance.getStartTime().after(beFailoveredMasterStartupTime)) {
+            // The processInstance is newly created
+            return false;
+        }
+
+        return true;
+    }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
new file mode 100644
index 0000000000..ec126a3ec3
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
+import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import lombok.NonNull;
+
+@Service
+public class WorkerFailoverService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerFailoverService.class);
+
+    private final RegistryClient registryClient;
+    private final MasterConfig masterConfig;
+    private final ProcessService processService;
+    private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
+    private final ProcessInstanceExecCacheManager cacheManager;
+    private final String localAddress;
+
+    public WorkerFailoverService(@NonNull RegistryClient registryClient,
+                                 @NonNull MasterConfig masterConfig,
+                                 @NonNull ProcessService processService,
+                                 @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
+                                 @NonNull ProcessInstanceExecCacheManager cacheManager) {
+        this.registryClient = registryClient;
+        this.masterConfig = masterConfig;
+        this.processService = processService;
+        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+        this.cacheManager = cacheManager;
+        this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+    }
+
+    /**
+     * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
+     * and failover these tasks.
+     * <p>
+     * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
+     *
+     * @param workerHost worker host
+     */
+    public void failoverWorker(@NonNull String workerHost) {
+        LOGGER.info("Worker[{}] failover starting", workerHost);
+        final StopWatch failoverTimeCost = StopWatch.createStarted();
+
+        // we query the task instance from cache, so that we can directly update the cache
+        final Optional<Date> needFailoverWorkerStartTime =
+            getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
+
+        final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost);
+        if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {
+            LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover", workerHost);
+            return;
+        }
+        LOGGER.info(
+            "Worker[{}] failover there are {} taskInstance may need to failover, will do a deep check, taskInstanceIds: {}",
+            workerHost,
+            needFailoverTaskInstanceList.size(),
+            needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList()));
+        final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
+        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
+            try {
+                ProcessInstance processInstance =
+                    processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> {
+                        WorkflowExecuteRunnable workflowExecuteRunnable =
+                            cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
+                        if (workflowExecuteRunnable == null) {
+                            return null;
+                        }
+                        return workflowExecuteRunnable.getProcessInstance();
+                    });
+                if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) {
+                    LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost);
+                    continue;
+                }
+                LOGGER.info(
+                    "Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE",
+                    workerHost);
+                failoverTaskInstance(processInstance, taskInstance);
+                LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
+            } catch (Exception ex) {
+                LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
+            } finally {
+                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+            }
+        }
+        failoverTimeCost.stop();
+        LOGGER.info("Worker[{}] failover finished, useTime:{}ms",
+            workerHost,
+            failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * failover task instance
+     * <p>
+     * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
+     * 2. change task state from running to need failover.
+     * 3. try to notify local master
+     *
+     * @param processInstance
+     * @param taskInstance
+     */
+    private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
+
+        TaskMetrics.incTaskFailover();
+        boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
+
+        taskInstance.setProcessInstance(processInstance);
+
+        if (!isMasterTask) {
+            LOGGER.info("The failover taskInstance is not master task");
+            TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
+                .buildTaskInstanceRelatedInfo(taskInstance)
+                .buildProcessInstanceRelatedInfo(processInstance)
+                .create();
+
+            if (masterConfig.isKillYarnJobWhenTaskFailover()) {
+                // only kill yarn job if exists , the local thread has exited
+                LOGGER.info("TaskInstance failover begin kill the task related yarn job");
+                ProcessUtils.killYarnJob(taskExecutionContext);
+            }
+        } else {
+            LOGGER.info("The failover taskInstance is a master task");
+        }
+
+        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+        taskInstance.setFlag(Flag.NO);
+        processService.saveTaskInstance(taskInstance);
+
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setTaskInstanceId(taskInstance.getId());
+        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+        stateEvent.setProcessInstanceId(processInstance.getId());
+        stateEvent.setExecutionStatus(taskInstance.getState());
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+    }
+
+    /**
+     * task needs failover if task start before server starts
+     *
+     * @return true if task instance need fail over
+     */
+    private boolean checkTaskInstanceNeedFailover(Optional<Date> needFailoverWorkerStartTime,
+                                                  @Nullable ProcessInstance processInstance,
+                                                  TaskInstance taskInstance) {
+        if (processInstance == null) {
+            // This case should be happened.
+            LOGGER.error(
+                "Failover task instance error, cannot find the related processInstance form memory, this case shouldn't happened");
+            return false;
+        }
+        if (taskInstance == null) {
+            // This case should be happened.
+            LOGGER.error("Master failover task instance error, taskInstance is null, this case shouldn't happened");
+            return false;
+        }
+        // only failover the task owned myself if worker down.
+        if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
+            LOGGER.error(
+                "Master failover task instance error, the taskInstance's processInstance's host: {} is not the current master: {}",
+                processInstance.getHost(),
+                localAddress);
+            return false;
+        }
+        if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
+            // The taskInstance is already finished, doesn't need to failover
+            LOGGER.info("The task is already finished, doesn't need to failover");
+            return false;
+        }
+        if (!needFailoverWorkerStartTime.isPresent()) {
+            // The worker is still down
+            return true;
+        }
+        // The worker is active, may already send some new task to it
+        if (taskInstance.getSubmitTime() != null && taskInstance.getSubmitTime()
+            .after(needFailoverWorkerStartTime.get())) {
+            LOGGER.info(
+                "The taskInstance's submitTime: {} is after the need failover worker's start time: {}, the taskInstance is newly submit, it doesn't need to failover",
+                taskInstance.getSubmitTime(),
+                needFailoverWorkerStartTime.get());
+            return false;
+        }
+
+        return true;
+    }
+
+    private List<TaskInstance> getNeedFailoverTaskInstance(@NonNull String failoverWorkerHost) {
+        // we query the task instance from cache, so that we can directly update the cache
+        return cacheManager.getAll()
+            .stream()
+            .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
+            // If the worker is in dispatching and the host is not set
+            .filter(taskInstance -> failoverWorkerHost.equals(taskInstance.getHost())
+                && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
+            .collect(Collectors.toList());
+    }
+
+    private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
+        if (CollectionUtils.isEmpty(servers)) {
+            return Optional.empty();
+        }
+        Date serverStartupTime = null;
+        for (Server server : servers) {
+            if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
+                serverStartupTime = server.getCreateTime();
+                break;
+            }
+        }
+        return Optional.ofNullable(serverStartupTime);
+    }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
index b2c7d63cd7..55dd236f71 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
@@ -52,7 +52,7 @@ public class ExecutionContextTestUtils {
         TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
         Command command = requestCommand.convert2Command();
 
-        ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER);
+        ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER, taskInstance);
         executionContext.setHost(Host.of(NetUtils.getAddr(port)));
 
         return executionContext;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index fdd79552b1..1ee890faf2 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -70,7 +70,7 @@ public class NettyExecutorManagerTest {
                 .buildProcessInstanceRelatedInfo(processInstance)
                 .buildProcessDefinitionRelatedInfo(processDefinition)
                 .create();
-        ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER);
+        ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
         executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
         Boolean execute = nettyExecutorManager.execute(executionContext);
         Assert.assertTrue(execute);
@@ -89,7 +89,7 @@ public class NettyExecutorManagerTest {
                 .buildProcessInstanceRelatedInfo(processInstance)
                 .buildProcessDefinitionRelatedInfo(processDefinition)
                 .create();
-        ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER);
+        ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
         executionContext.setHost(Host.of(NetUtils.getAddr(4444)));
         nettyExecutorManager.execute(executionContext);
 
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 98bf514730..44e5a382f6 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -26,13 +26,14 @@ import static org.mockito.Mockito.doNothing;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -98,9 +99,17 @@ public class FailoverServiceTest {
         springApplicationContext.setApplicationContext(applicationContext);
 
         given(masterConfig.getListenPort()).willReturn(masterPort);
-        failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager);
+        MasterFailoverService masterFailoverService =
+            new MasterFailoverService(registryClient, masterConfig, processService);
+        WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
+            masterConfig,
+            processService,
+            workflowExecuteThreadPool,
+            cacheManager);
+
+        failoverService = new FailoverService(masterFailoverService, workerFailoverService);
 
-        testMasterHost = failoverService.getLocalAddress();
+        testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
         String ip = testMasterHost.split(":")[0];
         int port = Integer.valueOf(testMasterHost.split(":")[1]);
         Assert.assertEquals(masterPort, port);
@@ -118,6 +127,7 @@ public class FailoverServiceTest {
         processInstance = new ProcessInstance();
         processInstance.setId(1);
         processInstance.setHost(testMasterHost);
+        processInstance.setStartTime(new Date());
         processInstance.setRestartTime(new Date());
         processInstance.setHistoryCmd("xxx");
         processInstance.setCommandType(CommandType.STOP);
@@ -154,16 +164,10 @@ public class FailoverServiceTest {
 
         given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer)));
         given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer)));
-        ReflectionTestUtils.setField(failoverService, "registryClient", registryClient);
 
         doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
     }
 
-    @Test
-    public void checkMasterFailoverTest() {
-        failoverService.checkMasterFailover();
-    }
-
     @Test
     public void failoverMasterTest() {
         processInstance.setHost(Constants.NULL);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 4bfb0b4809..f61676e693 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.DagData;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index c527276933..5624f47812 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -126,7 +126,6 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -266,10 +265,12 @@ public class ProcessServiceImpl implements ProcessService {
     @Autowired
     private TaskPluginManager taskPluginManager;
 
+    @Autowired
+    private ProcessService processService;
+
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      *
-     * @param logger  logger
      * @param host    host
      * @param command found command
      * @return process instance
@@ -904,7 +905,8 @@ public class ProcessServiceImpl implements ProcessService {
         ProcessDefinition processDefinition;
         CommandType commandType = command.getCommandType();
 
-        processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
+        processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(),
+                                                       command.getProcessDefinitionVersion());
         if (processDefinition == null) {
             logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
             return null;
@@ -1003,6 +1005,7 @@ public class ProcessServiceImpl implements ProcessService {
             case RECOVER_TOLERANCE_FAULT_PROCESS:
                 // recover tolerance fault process
                 processInstance.setRecovery(Flag.YES);
+                processInstance.setRunTimes(runTime + 1);
                 runStatus = processInstance.getState();
                 break;
             case COMPLEMENT_DATA:
@@ -1241,11 +1244,15 @@ public class ProcessServiceImpl implements ProcessService {
         while (retryTimes <= commitRetryTimes) {
             try {
                 // submit task to db
-                task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance);
+                // Only want to use transaction here
+                task = processService.submitTask(processInstance, taskInstance);
                 if (task != null && task.getId() != 0) {
                     break;
                 }
-                logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
+                logger.error(
+                    "task commit to db failed , taskId {} has already retry {} times, please check the database",
+                    taskInstance.getId(),
+                    retryTimes);
                 Thread.sleep(commitInterval);
             } catch (Exception e) {
                 logger.error("task commit to db failed", e);
@@ -1267,13 +1274,17 @@ public class ProcessServiceImpl implements ProcessService {
     @Override
     @Transactional(rollbackFor = Exception.class)
     public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
-        logger.info("start submit task : {}, processInstance id:{}, state: {}",
-            taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
+        logger.info("Start save taskInstance to database : {}, processInstance id:{}, state: {}",
+                    taskInstance.getName(),
+                    taskInstance.getProcessInstanceId(),
+                    processInstance.getState());
         //submit to db
         TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
         if (task == null) {
-            logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
-                taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
+            logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ",
+                         taskInstance.getName(),
+                         taskInstance.getProcessInstance(),
+                         processInstance.getState());
             return null;
         }
 
@@ -1281,8 +1292,13 @@ public class ProcessServiceImpl implements ProcessService {
             createSubWorkProcess(processInstance, task);
         }
 
-        logger.info("end submit task to db successfully:{} {} state:{} complete, instance id:{} state: {}  ",
-            taskInstance.getId(), taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
+        logger.info(
+            "End save taskInstance to db successfully:{}, taskInstanceName: {}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}",
+            taskInstance.getId(),
+            taskInstance.getName(),
+            task.getState(),
+            processInstance.getId(),
+            processInstance.getState());
         return task;
     }
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index 0aeec4609d..823ec81d3c 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -211,16 +211,45 @@ public class TaskPriority implements Comparable<TaskPriority> {
         }
         TaskPriority that = (TaskPriority) o;
         return processInstancePriority == that.processInstancePriority
-                && processInstanceId == that.processInstanceId
-                && taskInstancePriority == that.taskInstancePriority
-                && taskId == that.taskId
-                && taskGroupPriority == that.taskGroupPriority
-                && Objects.equals(groupName, that.groupName);
+            && processInstanceId == that.processInstanceId
+            && taskInstancePriority == that.taskInstancePriority
+            && taskId == that.taskId
+            && taskGroupPriority == that.taskGroupPriority
+            && Objects.equals(groupName, that.groupName);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, taskGroupPriority, groupName);
+        return Objects.hash(processInstancePriority,
+            processInstanceId,
+            taskInstancePriority,
+            taskId,
+            taskGroupPriority,
+            groupName);
     }
 
+    @Override
+    public String toString() {
+        return "TaskPriority{"
+            + "processInstancePriority="
+            + processInstancePriority
+            + ", processInstanceId="
+            + processInstanceId
+            + ", taskInstancePriority="
+            + taskInstancePriority
+            + ", taskId="
+            + taskId
+            + ", taskExecutionContext="
+            + taskExecutionContext
+            + ", groupName='"
+            + groupName
+            + '\''
+            + ", context="
+            + context
+            + ", checkpoint="
+            + checkpoint
+            + ", taskGroupPriority="
+            + taskGroupPriority
+            + '}';
+    }
 }