You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:29 UTC

[dolphinscheduler] 02/29: Optimize MasterServer, add MasterRPCService (#10371)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 81cadd15d2b7058eb94f84ad29d25f71324d4253
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Jun 9 10:00:05 2022 +0800

    Optimize MasterServer, add MasterRPCService (#10371)
    
    * Optimize MasterServer, avoid NPE
    
    (cherry picked from commit 3ecbee3885090b987e120667e1ccf6f6c24d1df8)
---
 .../apache/dolphinscheduler/common/Constants.java  |   2 +-
 .../server/master/MasterServer.java                |  66 ++---------
 .../cache/ProcessInstanceExecCacheManager.java     |   8 +-
 .../impl/ProcessInstanceExecCacheManagerImpl.java  |  10 +-
 .../processor/queue/StateEventResponseService.java |   4 +-
 ...ExecuteThread.java => TaskExecuteRunnable.java} |  37 +++---
 .../processor/queue/TaskExecuteThreadPool.java     |  16 +--
 .../registry/MasterConnectionStateListener.java    |  62 ++++++++++
 .../master/registry/MasterRegistryClient.java      |  84 ++++---------
 .../MasterRPCServer.java}                          | 130 ++++-----------------
 .../server/master/runner/EventExecuteService.java  |   2 +-
 .../master/runner/FailoverExecuteThread.java       |   2 +
 .../master/runner/MasterSchedulerService.java      |   2 +-
 .../master/runner/StateWheelExecuteThread.java     |  24 ++--
 ...uteThread.java => WorkflowExecuteRunnable.java} | 100 ++++++++--------
 .../master/runner/WorkflowExecuteThreadPool.java   |  10 +-
 .../server/master/service/FailoverService.java     |  33 ++++--
 ...hreadTest.java => WorkflowExecuteTaskTest.java} |  19 ++-
 .../ProcessInstanceExecCacheManagerImplTest.java   |  12 +-
 .../master/registry/MasterRegistryClientTest.java  |   7 --
 .../server/registry/HeartBeatTask.java             |   2 +-
 .../service/registry/RegistryClient.java           |   3 +-
 22 files changed, 273 insertions(+), 362 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 860e493ee1..0382b53c2c 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -795,7 +795,7 @@ public final class Constants {
      */
     public static final String PSTREE = "pstree";
 
-    public static final Boolean KUBERNETES_MODE = !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
+    public static final boolean KUBERNETES_MODE = !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
 
     /**
      * dry run flag
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6ab0d4e51a..cb01e14504 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -20,18 +20,8 @@ package org.apache.dolphinscheduler.server.master;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.remote.NettyRemotingServer;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
-import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
 import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
 import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
@@ -58,14 +48,9 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
 public class MasterServer implements IStoppable {
     private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
 
-    @Autowired
-    private MasterConfig masterConfig;
-
     @Autowired
     private SpringApplicationContext springApplicationContext;
 
-    private NettyRemotingServer nettyRemotingServer;
-
     @Autowired
     private MasterRegistryClient masterRegistryClient;
 
@@ -78,24 +63,6 @@ public class MasterServer implements IStoppable {
     @Autowired
     private Scheduler scheduler;
 
-    @Autowired
-    private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
-
-    @Autowired
-    private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
-
-    @Autowired
-    private TaskEventProcessor taskEventProcessor;
-
-    @Autowired
-    private StateEventProcessor stateEventProcessor;
-
-    @Autowired
-    private CacheProcessor cacheProcessor;
-
-    @Autowired
-    private TaskKillResponseProcessor taskKillResponseProcessor;
-
     @Autowired
     private EventExecuteService eventExecuteService;
 
@@ -103,7 +70,7 @@ public class MasterServer implements IStoppable {
     private FailoverExecuteThread failoverExecuteThread;
 
     @Autowired
-    private LoggerRequestProcessor loggerRequestProcessor;
+    private MasterRPCServer masterRPCServer;
 
     public static void main(String[] args) {
         Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
@@ -115,25 +82,8 @@ public class MasterServer implements IStoppable {
      */
     @PostConstruct
     public void run() throws SchedulerException {
-        // init remoting server
-        NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(masterConfig.getListenPort());
-        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
-
-        // logger server
-        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
-
-        this.nettyRemotingServer.start();
+        // init rpc server
+        this.masterRPCServer.start();
 
         // install task plugin
         this.taskPluginManager.installPlugin();
@@ -168,6 +118,7 @@ public class MasterServer implements IStoppable {
         try {
             // execute only once
             if (Stopper.isStopped()) {
+                logger.warn("MasterServer has been stopped ..., current cause: {}", cause);
                 return;
             }
 
@@ -184,10 +135,13 @@ public class MasterServer implements IStoppable {
             }
             // close
             this.masterSchedulerService.close();
-            this.nettyRemotingServer.close();
+            this.masterRPCServer.close();
             this.masterRegistryClient.closeRegistry();
-            // close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
+            // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
+            // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
             springApplicationContext.close();
+
+            logger.info("MasterServer stopped...");
         } catch (Exception e) {
             logger.error("master server stop exception ", e);
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
index 5fdf64493c..2f5f6dc472 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.cache;
 
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
 import java.util.Collection;
 
@@ -32,7 +32,7 @@ public interface ProcessInstanceExecCacheManager {
      * @param processInstanceId processInstanceId
      * @return WorkflowExecuteThread
      */
-    WorkflowExecuteThread getByProcessInstanceId(int processInstanceId);
+    WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);
 
     /**
      * judge the process instance does it exist
@@ -55,12 +55,12 @@ public interface ProcessInstanceExecCacheManager {
      * @param processInstanceId     processInstanceId
      * @param workflowExecuteThread if it is null, will not be cached
      */
-    void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread);
+    void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread);
 
     /**
      * get all WorkflowExecuteThread from cache
      *
      * @return all WorkflowExecuteThread in cache
      */
-    Collection<WorkflowExecuteThread> getAll();
+    Collection<WorkflowExecuteRunnable> getAll();
 }
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
index 1d0ab4841a..137bd4ecc5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.server.master.cache.impl;
 
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,10 +33,10 @@ import com.google.common.collect.ImmutableList;
 @Component
 public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager {
 
-    private final ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps = new ConcurrentHashMap<>();
 
     @Override
-    public WorkflowExecuteThread getByProcessInstanceId(int processInstanceId) {
+    public WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId) {
         return processInstanceExecMaps.get(processInstanceId);
     }
 
@@ -51,7 +51,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
     }
 
     @Override
-    public void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread) {
+    public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) {
         if (workflowExecuteThread == null) {
             return;
         }
@@ -59,7 +59,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
     }
 
     @Override
-    public Collection<WorkflowExecuteThread> getAll() {
+    public Collection<WorkflowExecuteRunnable> getAll() {
         return ImmutableList.copyOf(processInstanceExecMaps.values());
     }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index cba592c63d..83772a1054 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 
 import java.util.ArrayList;
@@ -135,7 +135,7 @@ public class StateEventResponseService {
                 return;
             }
 
-            WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+            WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
             switch (stateEvent.getType()) {
                 case TASK_STATE_CHANGE:
                     workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
similarity index 84%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 47b190e246..593567c6eb 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -25,11 +25,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.slf4j.Logger;
@@ -40,9 +41,9 @@ import io.netty.channel.Channel;
 /**
  * task execute thread
  */
-public class TaskExecuteThread {
+public class TaskExecuteRunnable implements Runnable {
 
-    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
+    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteRunnable.class);
 
     private final int processInstanceId;
 
@@ -56,8 +57,8 @@ public class TaskExecuteThread {
 
     private DataQualityResultOperator dataQualityResultOperator;
 
-    public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
-                             ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
+    public TaskExecuteRunnable(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
+                               ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
         this.processInstanceId = processInstanceId;
         this.processService = processService;
         this.workflowExecuteThreadPool = workflowExecuteThreadPool;
@@ -65,6 +66,7 @@ public class TaskExecuteThread {
         this.dataQualityResultOperator = dataQualityResultOperator;
     }
 
+    @Override
     public void run() {
         while (!this.events.isEmpty()) {
             TaskEvent event = this.events.peek();
@@ -113,12 +115,12 @@ public class TaskExecuteThread {
         int taskInstanceId = taskEvent.getTaskInstanceId();
         int processInstanceId = taskEvent.getProcessInstanceId();
 
-        TaskInstance taskInstance;
-        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
-        if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
-            taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+        Optional<TaskInstance> taskInstance;
+        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+        if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
+            taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
         } else {
-            taskInstance = processService.findTaskInstanceById(taskInstanceId);
+            taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
         }
 
         switch (event) {
@@ -148,11 +150,12 @@ public class TaskExecuteThread {
     /**
      * handle dispatch event
      */
-    private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
-        if (taskInstance == null) {
+    private void handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
+        if (!taskInstanceOptional.isPresent()) {
             logger.error("taskInstance is null");
             return;
         }
+        TaskInstance taskInstance = taskInstanceOptional.get();
         if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
             return;
         }
@@ -164,10 +167,11 @@ public class TaskExecuteThread {
     /**
      * handle running event
      */
-    private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+    private void handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
         Channel channel = taskEvent.getChannel();
         try {
-            if (taskInstance != null) {
+            if (taskInstanceOptional.isPresent()) {
+                TaskInstance taskInstance = taskInstanceOptional.get();
                 if (taskInstance.getState().typeIsFinished()) {
                     logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
                 } else {
@@ -194,10 +198,11 @@ public class TaskExecuteThread {
     /**
      * handle result event
      */
-    private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+    private void handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
         Channel channel = taskEvent.getChannel();
         try {
-            if (taskInstance != null) {
+            if (taskInstanceOptional.isPresent()) {
+                TaskInstance taskInstance = taskInstanceOptional.get();
                 dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
 
                 taskInstance.setStartTime(taskEvent.getStartTime());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index c9c2868d36..323ea86411 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -40,7 +40,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
 
     private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class);
 
-    private final ConcurrentHashMap<String, TaskExecuteThread> multiThreadFilterMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, TaskExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap<>();
 
     @Autowired
     private MasterConfig masterConfig;
@@ -67,7 +67,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
     /**
      * task event thread map
      */
-    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Integer, TaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
 
     @PostConstruct
     private void init() {
@@ -83,26 +83,26 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
             return;
         }
         if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
-            TaskExecuteThread taskExecuteThread = new TaskExecuteThread(
+            TaskExecuteRunnable taskExecuteThread = new TaskExecuteRunnable(
                     taskEvent.getProcessInstanceId(),
                     processService, workflowExecuteThreadPool,
                     processInstanceExecCacheManager,
                     dataQualityResultOperator);
             taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
         }
-        TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
-        if (taskExecuteThread != null) {
-            taskExecuteThread.addEvent(taskEvent);
+        TaskExecuteRunnable taskExecuteRunnable= taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
+        if (taskExecuteRunnable != null) {
+            taskExecuteRunnable.addEvent(taskEvent);
         }
     }
 
     public void eventHandler() {
-        for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) {
+        for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) {
             executeEvent(taskExecuteThread);
         }
     }
 
-    public void executeEvent(TaskExecuteThread taskExecuteThread) {
+    public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
         if (taskExecuteThread.eventSize() == 0) {
             return;
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
new file mode 100644
index 0000000000..bc1b217f9e
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterConnectionStateListener implements ConnectionListener {
+
+    private static final Logger logger = LoggerFactory.getLogger(MasterConnectionStateListener.class);
+
+    private final String masterNodePath;
+    private final RegistryClient registryClient;
+
+    public MasterConnectionStateListener(String masterNodePath, RegistryClient registryClient) {
+        this.masterNodePath = checkNotNull(masterNodePath);
+        this.registryClient = checkNotNull(registryClient);
+    }
+
+    @Override
+    public void onUpdate(ConnectionState state) {
+        switch (state) {
+            case CONNECTED:
+                logger.debug("registry connection state is {}", state);
+                break;
+            case SUSPENDED:
+                logger.warn("registry connection state is {}, ready to retry connection", state);
+                break;
+            case RECONNECTED:
+                logger.debug("registry connection state is {}, clean the node info", state);
+                registryClient.remove(masterNodePath);
+                registryClient.persistEphemeral(masterNodePath, "");
+                break;
+            case DISCONNECTED:
+                logger.warn("registry connection state is {}, ready to stop myself", state);
+                registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
+                break;
+            default:
+        }
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index c79f9b7071..5cf1c945af 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -24,35 +24,18 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.server.master.service.FailoverService;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
-import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 
-import java.time.Duration;
 import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -65,9 +48,8 @@ import org.springframework.stereotype.Component;
 import com.google.common.collect.Sets;
 
 /**
- * zookeeper master client
- * <p>
- * single instance
+ * <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
+ * <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
  */
 @Component
 public class MasterRegistryClient {
@@ -101,8 +83,10 @@ public class MasterRegistryClient {
      * master startup time, ms
      */
     private long startupTime;
+    private String masterAddress;
 
     public void init() {
+        this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
         this.startupTime = System.currentTimeMillis();
         this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
     }
@@ -111,11 +95,12 @@ public class MasterRegistryClient {
         try {
             // master registry
             registry();
-
+            registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(),
+                                                                                        registryClient));
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
         } catch (Exception e) {
             logger.error("master start up exception", e);
-            throw new RuntimeException("master start up error", e);
+            throw new RegistryException("Master registry client start up error", e);
         }
     }
 
@@ -198,24 +183,25 @@ public class MasterRegistryClient {
     }
 
     /**
-     * registry
+     * Registry the current master server itself to registry.
      */
-    public void registry() {
-        String address = NetUtils.getAddr(masterConfig.getListenPort());
-        String localNodePath = getMasterPath();
+    void registry() {
+        logger.info("master node : {} registering to registry center...", masterAddress);
+        String localNodePath = getCurrentNodePath();
         int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                masterConfig.getMaxCpuLoadAvg(),
-                masterConfig.getReservedMemory(),
-                Sets.newHashSet(getMasterPath()),
-                Constants.MASTER_TYPE,
-                registryClient);
+                                                        masterConfig.getMaxCpuLoadAvg(),
+                                                        masterConfig.getReservedMemory(),
+                                                        Sets.newHashSet(localNodePath),
+                                                        Constants.MASTER_TYPE,
+                                                        registryClient);
 
         // remove before persist
         registryClient.remove(localNodePath);
         registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
 
         while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
+            logger.warn("The current master server node:{} cannot find in registry....", NetUtils.getHost());
             ThreadUtils.sleep(SLEEP_TIME_MILLIS);
         }
 
@@ -225,37 +211,17 @@ public class MasterRegistryClient {
         // delete dead server
         registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
 
-        registryClient.addConnectionStateListener(this::handleConnectionState);
-        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
-        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
-
-    }
+        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS);
+        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s",
+                    masterAddress,
+                    masterHeartbeatInterval);
 
-    public void handleConnectionState(ConnectionState state) {
-        String localNodePath = getMasterPath();
-        switch (state) {
-            case CONNECTED:
-                logger.debug("registry connection state is {}", state);
-                break;
-            case SUSPENDED:
-                logger.warn("registry connection state is {}, ready to retry connection", state);
-                break;
-            case RECONNECTED:
-                logger.debug("registry connection state is {}, clean the node info", state);
-                registryClient.persistEphemeral(localNodePath, "");
-                break;
-            case DISCONNECTED:
-                logger.warn("registry connection state is {}, ready to stop myself", state);
-                registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
-                break;
-            default:
-        }
     }
 
     public void deregister() {
         try {
             String address = getLocalAddress();
-            String localNodePath = getMasterPath();
+            String localNodePath = getCurrentNodePath();
             registryClient.remove(localNodePath);
             logger.info("master node : {} unRegistry to register center.", address);
             heartBeatExecutor.shutdown();
@@ -269,7 +235,7 @@ public class MasterRegistryClient {
     /**
      * get master path
      */
-    private String getMasterPath() {
+    private String getCurrentNodePath() {
         String address = getLocalAddress();
         return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
similarity index 51%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 6ab0d4e51a..e0b65dd077 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -15,11 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master;
+package org.apache.dolphinscheduler.server.master.rpc;
 
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@@ -31,52 +28,27 @@ import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
-import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
-import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
-import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
 
 import javax.annotation.PostConstruct;
 
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.cache.annotation.EnableCaching;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.transaction.annotation.EnableTransactionManagement;
-
-@SpringBootApplication
-@ComponentScan("org.apache.dolphinscheduler")
-@EnableTransactionManagement
-@EnableCaching
-public class MasterServer implements IStoppable {
-    private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
+import org.springframework.stereotype.Service;
 
-    @Autowired
-    private MasterConfig masterConfig;
+/**
+ * Master RPC Server, used to send/receive request to other system.
+ */
+@Service
+public class MasterRPCServer implements AutoCloseable {
 
-    @Autowired
-    private SpringApplicationContext springApplicationContext;
+    private static final Logger logger = LoggerFactory.getLogger(MasterRPCServer.class);
 
     private NettyRemotingServer nettyRemotingServer;
 
     @Autowired
-    private MasterRegistryClient masterRegistryClient;
-
-    @Autowired
-    private TaskPluginManager taskPluginManager;
-
-    @Autowired
-    private MasterSchedulerService masterSchedulerService;
-
-    @Autowired
-    private Scheduler scheduler;
+    private MasterConfig masterConfig;
 
     @Autowired
     private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@@ -97,24 +69,13 @@ public class MasterServer implements IStoppable {
     private TaskKillResponseProcessor taskKillResponseProcessor;
 
     @Autowired
-    private EventExecuteService eventExecuteService;
-
-    @Autowired
-    private FailoverExecuteThread failoverExecuteThread;
+    private TaskRecallProcessor taskRecallProcessor;
 
     @Autowired
     private LoggerRequestProcessor loggerRequestProcessor;
 
-    public static void main(String[] args) {
-        Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
-        SpringApplication.run(MasterServer.class);
-    }
-
-    /**
-     * run master server
-     */
     @PostConstruct
-    public void run() throws SchedulerException {
+    private void init() {
         // init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(masterConfig.getListenPort());
@@ -126,6 +87,7 @@ public class MasterServer implements IStoppable {
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
 
         // logger server
         this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
@@ -134,67 +96,19 @@ public class MasterServer implements IStoppable {
         this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
 
         this.nettyRemotingServer.start();
-
-        // install task plugin
-        this.taskPluginManager.installPlugin();
-
-        // self tolerant
-        this.masterRegistryClient.init();
-        this.masterRegistryClient.start();
-        this.masterRegistryClient.setRegistryStoppable(this);
-
-        this.masterSchedulerService.init();
-        this.masterSchedulerService.start();
-
-        this.eventExecuteService.start();
-        this.failoverExecuteThread.start();
-
-        this.scheduler.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            if (Stopper.isRunning()) {
-                close("shutdownHook");
-            }
-        }));
     }
 
-    /**
-     * gracefully close
-     *
-     * @param cause close cause
-     */
-    public void close(String cause) {
-
-        try {
-            // execute only once
-            if (Stopper.isStopped()) {
-                return;
-            }
-
-            logger.info("master server is stopping ..., cause : {}", cause);
-
-            // set stop signal is true
-            Stopper.stop();
-
-            try {
-                // thread sleep 3 seconds for thread quietly stop
-                Thread.sleep(3000L);
-            } catch (Exception e) {
-                logger.warn("thread sleep exception ", e);
-            }
-            // close
-            this.masterSchedulerService.close();
-            this.nettyRemotingServer.close();
-            this.masterRegistryClient.closeRegistry();
-            // close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
-            springApplicationContext.close();
-        } catch (Exception e) {
-            logger.error("master server stop exception ", e);
-        }
+    public void start() {
+        logger.info("Starting Master RPC Server...");
+        this.nettyRemotingServer.start();
+        logger.info("Started Master RPC Server...");
     }
 
     @Override
-    public void stop(String cause) {
-        close(cause);
+    public void close() {
+        logger.info("Closing Master RPC Server...");
+        this.nettyRemotingServer.close();
+        logger.info("Closed Master RPC Server...");
     }
+
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 6f6f718c2f..fc9c4fd854 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -62,7 +62,7 @@ public class EventExecuteService extends Thread {
     }
 
     private void eventHandler() {
-        for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
+        for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
             workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
         }
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index aa894f8b3b..5fd812f162 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -56,6 +56,8 @@ public class FailoverExecuteThread extends Thread {
         logger.info("failover execute thread started");
         while (Stopper.isRunning()) {
             try {
+                // todo: DO we need to schedule a task to do this kind of check
+                // This kind of check may only need to be executed when a master server start
                 failoverService.checkMasterFailover();
             } catch (Exception e) {
                 logger.error("failover execute error", e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 5c97c0b41f..e688863d06 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -165,7 +165,7 @@ public class MasterSchedulerService extends Thread {
                 continue;
             }
 
-            WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
+            WorkflowExecuteRunnable workflowExecuteThread = new WorkflowExecuteRunnable(
                     processInstance
                     , processService
                     , nettyExecutorManager
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index ba614e573b..7635209ebb 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 
 import org.apache.hadoop.util.ThreadUtil;
 
+import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.slf4j.Logger;
@@ -112,7 +113,7 @@ public class StateWheelExecuteThread extends Thread {
             if (processInstanceId == null) {
                 continue;
             }
-            WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+            WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
             if (workflowExecuteThread == null) {
                 logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
                 processInstanceTimeoutCheckList.remove(processInstanceId);
@@ -219,20 +220,21 @@ public class StateWheelExecuteThread extends Thread {
             int processInstanceId = taskInstanceKey.getProcessInstanceId();
             long taskCode = taskInstanceKey.getTaskCode();
 
-            WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+            WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
             if (workflowExecuteThread == null) {
                 logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
                         processInstanceId, taskCode);
                 taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                 continue;
             }
-            TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
-            if (taskInstance == null) {
+            Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+            if (!taskInstanceOptional.isPresent()) {
                 logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
                         processInstanceId, taskCode);
                 taskInstanceTimeoutCheckList.remove(taskInstanceKey);
                 continue;
             }
+            TaskInstance taskInstance = taskInstanceOptional.get();
             if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
                 long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
                 if (timeRemain < 0) {
@@ -252,7 +254,7 @@ public class StateWheelExecuteThread extends Thread {
             int processInstanceId = taskInstanceKey.getProcessInstanceId();
             long taskCode = taskInstanceKey.getTaskCode();
 
-            WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+            WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
 
             if (workflowExecuteThread == null) {
                 logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
@@ -261,7 +263,7 @@ public class StateWheelExecuteThread extends Thread {
                 continue;
             }
 
-            TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
+            Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
             ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
 
             if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@@ -270,13 +272,14 @@ public class StateWheelExecuteThread extends Thread {
                 break;
             }
 
-            if (taskInstance == null) {
+            if (!taskInstanceOptional.isPresent()) {
                 logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
                         processInstanceId, taskCode);
                 taskInstanceRetryCheckList.remove(taskInstanceKey);
                 continue;
             }
 
+            TaskInstance taskInstance = taskInstanceOptional.get();
             if (taskInstance.retryTaskIntervalOverTime()) {
                 // reset taskInstance endTime and state
                 // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
@@ -297,20 +300,21 @@ public class StateWheelExecuteThread extends Thread {
             int processInstanceId = taskInstanceKey.getProcessInstanceId();
             long taskCode = taskInstanceKey.getTaskCode();
 
-            WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+            WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
             if (workflowExecuteThread == null) {
                 logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
                         processInstanceId, taskCode);
                 taskInstanceStateCheckList.remove(taskInstanceKey);
                 continue;
             }
-            TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
-            if (taskInstance == null) {
+            Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+            if (!taskInstanceOptional.isPresent()) {
                 logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
                         processInstanceId, taskCode);
                 taskInstanceStateCheckList.remove(taskInstanceKey);
                 continue;
             }
+            TaskInstance taskInstance = taskInstanceOptional.get();
             if (taskInstance.getState().typeIsFinished()) {
                 continue;
             }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
similarity index 96%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index fede5abad2..67bfe3c928 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -100,34 +100,34 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 
 /**
- * master exec thread,split dag
+ * Workflow execute task, used to execute a workflow instance.
  */
-public class WorkflowExecuteThread {
+public class WorkflowExecuteRunnable implements Runnable {
 
     /**
      * logger of WorkflowExecuteThread
      */
-    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThread.class);
+    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
 
     /**
      * master config
      */
-    private MasterConfig masterConfig;
+    private final MasterConfig masterConfig;
 
     /**
      * process service
      */
-    private ProcessService processService;
+    private final ProcessService processService;
 
     /**
      * alert manager
      */
-    private ProcessAlertManager processAlertManager;
+    private final ProcessAlertManager processAlertManager;
 
     /**
      * netty executor manager
      */
-    private NettyExecutorManager nettyExecutorManager;
+    private final NettyExecutorManager nettyExecutorManager;
 
     /**
      * process instance
@@ -162,7 +162,7 @@ public class WorkflowExecuteThread {
     /**
      * task instance hash map, taskId as key
      */
-    private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
+    private final Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
 
     /**
      * running taskProcessor, taskCode as key, taskProcessor as value
@@ -174,34 +174,34 @@ public class WorkflowExecuteThread {
      * valid task map, taskCode as key, taskId as value
      * in a DAG, only one taskInstance per taskCode is valid
      */
-    private Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
+    private final Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
 
     /**
      * error task map, taskCode as key, taskInstanceId as value
      * in a DAG, only one taskInstance per taskCode is valid
      */
-    private Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
+    private final Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
 
     /**
      * complete task map, taskCode as key, taskInstanceId as value
      * in a DAG, only one taskInstance per taskCode is valid
      */
-    private Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
+    private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
 
     /**
      * depend failed task map, taskCode as key, taskId as value
      */
-    private Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
+    private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
 
     /**
      * forbidden task map, code as key
      */
-    private Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
+    private final Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
 
     /**
      * skip task map, code as key
      */
-    private Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
+    private final Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
 
     /**
      * complement date list
@@ -211,27 +211,25 @@ public class WorkflowExecuteThread {
     /**
      * state event queue
      */
-    private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
 
     /**
      * ready to submit task queue
      */
-    private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
+    private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
 
     /**
      * wait to retry taskInstance map, taskCode as key, taskInstance as value
      * before retry, the taskInstance id is 0
      */
-    private Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
+    private final Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
 
     /**
      * state wheel execute thread
      */
-    private StateWheelExecuteThread stateWheelExecuteThread;
+    private final StateWheelExecuteThread stateWheelExecuteThread;
 
     /**
-     * constructor of WorkflowExecuteThread
-     *
      * @param processInstance         processInstance
      * @param processService          processService
      * @param nettyExecutorManager    nettyExecutorManager
@@ -239,12 +237,12 @@ public class WorkflowExecuteThread {
      * @param masterConfig            masterConfig
      * @param stateWheelExecuteThread stateWheelExecuteThread
      */
-    public WorkflowExecuteThread(ProcessInstance processInstance,
-                                 ProcessService processService,
-                                 NettyExecutorManager nettyExecutorManager,
-                                 ProcessAlertManager processAlertManager,
-                                 MasterConfig masterConfig,
-                                 StateWheelExecuteThread stateWheelExecuteThread) {
+    public WorkflowExecuteRunnable(ProcessInstance processInstance
+            , ProcessService processService
+            , NettyExecutorManager nettyExecutorManager
+            , ProcessAlertManager processAlertManager
+            , MasterConfig masterConfig
+            , StateWheelExecuteThread stateWheelExecuteThread) {
         this.processService = processService;
         this.processInstance = processInstance;
         this.masterConfig = masterConfig;
@@ -401,7 +399,10 @@ public class WorkflowExecuteThread {
             return true;
         }
 
-        TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
+        Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
+        TaskInstance task = taskInstanceOptional.orElseThrow(
+                () -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId()));
+
         if (task.getState() == null) {
             logger.error("task state is null, state handler error: {}", stateEvent);
             return true;
@@ -632,37 +633,37 @@ public class WorkflowExecuteThread {
     /**
      * get task instance from memory
      */
-    public TaskInstance getTaskInstance(int taskInstanceId) {
+    public Optional<TaskInstance> getTaskInstance(int taskInstanceId) {
         if (taskInstanceMap.containsKey(taskInstanceId)) {
-            return taskInstanceMap.get(taskInstanceId);
+            return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
         }
-        return null;
+        return Optional.empty();
     }
 
-    public TaskInstance getTaskInstance(long taskCode) {
-        if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
-            return null;
+    public Optional<TaskInstance> getTaskInstance(long taskCode) {
+        if (taskInstanceMap.isEmpty()) {
+            return Optional.empty();
         }
         for (TaskInstance taskInstance : taskInstanceMap.values()) {
             if (taskInstance.getTaskCode() == taskCode) {
-                return taskInstance;
+                return Optional.of(taskInstance);
             }
         }
-        return null;
+        return Optional.empty();
     }
 
-    public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) {
+    public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) {
         if (activeTaskProcessorMaps.containsKey(taskCode)) {
-            return activeTaskProcessorMaps.get(taskCode).taskInstance();
+            return Optional.ofNullable(activeTaskProcessorMaps.get(taskCode).taskInstance());
         }
-        return null;
+        return Optional.empty();
     }
 
-    public TaskInstance getRetryTaskInstanceByTaskCode(long taskCode) {
+    public Optional<TaskInstance> getRetryTaskInstanceByTaskCode(long taskCode) {
         if (waitToRetryTaskInstanceMap.containsKey(taskCode)) {
-            return waitToRetryTaskInstanceMap.get(taskCode);
+            return Optional.ofNullable(waitToRetryTaskInstanceMap.get(taskCode));
         }
-        return null;
+        return Optional.empty();
     }
 
     private boolean processStateChangeHandler(StateEvent stateEvent) {
@@ -697,7 +698,9 @@ public class WorkflowExecuteThread {
 
     private boolean processBlockHandler(StateEvent stateEvent) {
         try {
-            TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
+            Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
+            TaskInstance task = taskInstanceOptional.orElseThrow(
+                    () -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId()));
             if (!checkTaskInstanceByStateEvent(stateEvent)) {
                 logger.error("task {} is not a blocking task", task.getTaskCode());
                 return false;
@@ -790,9 +793,10 @@ public class WorkflowExecuteThread {
     }
 
     /**
-     * process start handle
+     * ProcessInstance start entrypoint.
      */
-    public void startProcess() {
+    @Override
+    public void run() {
         if (this.taskInstanceMap.size() > 0) {
             return;
         }
@@ -1305,9 +1309,9 @@ public class WorkflowExecuteThread {
         List<TaskInstance> taskInstances = new ArrayList<>();
         for (String taskNode : submitTaskNodeList) {
             TaskNode taskNodeObject = dag.getNode(taskNode);
-            TaskInstance existTaskInstance = getTaskInstance(taskNodeObject.getCode());
-            if (existTaskInstance != null) {
-                taskInstances.add(existTaskInstance);
+            Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskNodeObject.getCode());
+            if (existTaskInstanceOptional.isPresent()) {
+                taskInstances.add(existTaskInstanceOptional.get());
                 continue;
             }
             TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
@@ -2024,8 +2028,6 @@ public class WorkflowExecuteThread {
         }
     }
 
-}
-
     private void measureTaskState(StateEvent taskStateEvent) {
         if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
             // the event is broken
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 556f36a1b7..c3d771e365 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -68,7 +68,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     /**
      * multi-thread filter, avoid handling workflow at the same time
      */
-    private ConcurrentHashMap<String, WorkflowExecuteThread> multiThreadFilterMap = new ConcurrentHashMap();
+    private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap();
 
     @PostConstruct
     private void init() {
@@ -82,7 +82,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
      * submit state event
      */
     public void submitStateEvent(StateEvent stateEvent) {
-        WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+        WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
         if (workflowExecuteThread == null) {
             logger.warn("workflowExecuteThread is null, stateEvent:{}", stateEvent);
             return;
@@ -93,14 +93,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     /**
      * start workflow
      */
-    public void startWorkflow(WorkflowExecuteThread workflowExecuteThread) {
-        submit(workflowExecuteThread::startProcess);
+    public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
+        submit(workflowExecuteThread);
     }
 
     /**
      * execute workflow
      */
-    public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
+    public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
         if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
             return;
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 2557b659e0..dad2dbfe1e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.service;
 
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
@@ -36,13 +39,15 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
 
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,12 +75,14 @@ public class FailoverService {
     /**
      * check master failover
      */
+    @Counted(value = "failover_scheduler_check_task_count")
+    @Timed(value = "failover_scheduler_check_task_time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
     public void checkMasterFailover() {
         List<String> hosts = getNeedFailoverMasterServers();
         if (CollectionUtils.isEmpty(hosts)) {
             return;
         }
-        LOGGER.info("need failover hosts:{}", hosts);
+        LOGGER.info("{} begin to failover hosts:{}", getLocalAddress(), hosts);
 
         for (String host : hosts) {
             failoverMasterWithLock(host);
@@ -114,9 +121,9 @@ public class FailoverService {
     }
 
     /**
-     * failover master
-     * <p>
-     * failover process instance and associated task instance
+     * Failover master, will failover process instance and associated task instance.
+     * <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
+     * then the process instance will be failovered.
      *
      * @param masterHost master host
      */
@@ -125,11 +132,11 @@ public class FailoverService {
             return;
         }
         Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
-        long startTime = System.currentTimeMillis();
+        StopWatch failoverTimeCost = StopWatch.createStarted();
         List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
-        LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
+        LOGGER.info("start master[{}] failover, need to failover process list size:{}", masterHost, needFailoverProcessInstanceList.size());
 
-        // servers need to contains master hosts and worker hosts, otherwise the logic task will failover fail.
+        // servers need to contain master hosts and worker hosts, otherwise the logic task will failover fail.
         List<Server> servers = registryClient.getServerList(NodeType.WORKER);
         servers.addAll(registryClient.getServerList(NodeType.MASTER));
 
@@ -145,7 +152,7 @@ public class FailoverService {
             }
 
             if (serverStartupTime != null && processInstance.getRestartTime() != null
-                && processInstance.getRestartTime().after(serverStartupTime)) {
+                    && processInstance.getRestartTime().after(serverStartupTime)) {
                 continue;
             }
 
@@ -155,7 +162,8 @@ public class FailoverService {
             processService.processNeedFailoverProcessInstances(processInstance);
         }
 
-        LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
+        failoverTimeCost.stop();
+        LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
     }
 
     /**
@@ -248,7 +256,10 @@ public class FailoverService {
     }
 
     /**
-     * get need failover master servers
+     * Get need failover master servers.
+     * <p>
+     * Query the process instances from database, if the processInstance's host doesn't exist in registry
+     * or the host is the currentServer, then it will need to failover.
      *
      * @return need failover master servers
      */
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
similarity index 95%
rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
rename to dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
index 6e1c84382d..339c5e80ae 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
@@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
-
 import static org.powermock.api.mockito.PowerMockito.mock;
 
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -37,7 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -70,10 +69,10 @@ import org.springframework.context.ApplicationContext;
  * test for WorkflowExecuteThread
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({WorkflowExecuteThread.class})
-public class WorkflowExecuteThreadTest {
+@PrepareForTest({WorkflowExecuteRunnable.class})
+public class WorkflowExecuteTaskTest {
 
-    private WorkflowExecuteThread workflowExecuteThread;
+    private WorkflowExecuteRunnable workflowExecuteThread;
 
     private ProcessInstance processInstance;
 
@@ -118,9 +117,9 @@ public class WorkflowExecuteThreadTest {
         Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
 
         stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
-        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread));
+        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread));
         // prepareProcess init dag
-        Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
+        Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
         dag.setAccessible(true);
         dag.set(workflowExecuteThread, new DAG());
         PowerMockito.doNothing().when(workflowExecuteThread, "endProcess");
@@ -132,7 +131,7 @@ public class WorkflowExecuteThreadTest {
             Map<String, String> cmdParam = new HashMap<>();
             cmdParam.put(CMD_PARAM_START_NODES, "1,2,3");
             Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam));
-            Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
+            Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
             Method method = masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class);
             method.setAccessible(true);
             List<String> nodeNames = (List<String>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
@@ -158,7 +157,7 @@ public class WorkflowExecuteThreadTest {
             Mockito.when(processService.findTaskInstanceByIdList(
                     Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
             ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
-            Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
+            Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
             Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
             method.setAccessible(true);
             List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
@@ -202,7 +201,7 @@ public class WorkflowExecuteThreadTest {
             completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
             completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
 
-            Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
+            Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
 
             Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
             completeTaskMapField.setAccessible(true);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
index 6fa9116e63..d7b53f3c34 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.cache.impl;
 
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
 import java.util.Collection;
 
@@ -37,7 +37,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
     private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
 
     @Mock
-    private WorkflowExecuteThread workflowExecuteThread;
+    private WorkflowExecuteRunnable workflowExecuteThread;
 
     @Before
     public void before() {
@@ -47,7 +47,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
 
     @Test
     public void testGetByProcessInstanceId() {
-        WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
+        WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
         Assert.assertEquals("workflowExecuteThread1", workflowExecuteThread.getKey());
     }
 
@@ -59,20 +59,20 @@ public class ProcessInstanceExecCacheManagerImplTest {
     @Test
     public void testCacheNull() {
         processInstanceExecCacheManager.cache(2, null);
-        WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
+        WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
         Assert.assertNull(workflowExecuteThread);
     }
 
     @Test
     public void testRemoveByProcessInstanceId() {
         processInstanceExecCacheManager.removeByProcessInstanceId(1);
-        WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
+        WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
         Assert.assertNull(workflowExecuteThread);
     }
 
     @Test
     public void testGetAll() {
-        Collection<WorkflowExecuteThread> workflowExecuteThreads = processInstanceExecCacheManager.getAll();
+        Collection<WorkflowExecuteRunnable> workflowExecuteThreads = processInstanceExecCacheManager.getAll();
         Assert.assertEquals(1, workflowExecuteThreads.size());
     }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index d3d5b03de9..2a4b4c33ff 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -111,13 +111,6 @@ public class MasterRegistryClientTest {
         masterRegistryClient.registry();
     }
 
-    @Test
-    public void handleConnectionStateTest() {
-        masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
-        masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
-        masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
-    }
-
     @Test
     public void removeNodePathTest() {
         masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 17b4df14a9..129aaff0de 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -89,7 +89,7 @@ public class HeartBeatTask implements Runnable {
                 registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
             }
         } catch (Throwable ex) {
-            logger.error("error write heartbeat info", ex);
+            logger.error("HeartBeat task execute failed", ex);
         }
     }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index 2f98b09b34..c606007e88 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.service.registry;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
 import static org.apache.dolphinscheduler.common.Constants.COLON;
 import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
@@ -29,8 +30,6 @@ import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
 import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
 import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;