You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/16 15:45:42 UTC

[dolphinscheduler] branch dev updated: [Improvement-7213][MasterServer] execute thread pool code optimization (#7258)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 41bf1a9  [Improvement-7213][MasterServer] execute thread pool code optimization (#7258)
41bf1a9 is described below

commit 41bf1a955e2d76414ee5d8e3371fc163a813b435
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Thu Dec 16 23:45:32 2021 +0800

    [Improvement-7213][MasterServer] execute thread pool code optimization (#7258)
    
    * threadpool optimization
    
    * threadpool params
    
    * rebase dev
    
    * ut check fix
    
    * add return
    
    * rebase dev
    
    * event loop
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../apache/dolphinscheduler/common/Constants.java  |   8 +
 .../server/master/MasterServer.java                |  15 +-
 .../processor/queue/StateEventResponseService.java |  12 +-
 .../processor/queue/TaskResponseService.java       |  22 ++-
 .../master/registry/MasterRegistryClient.java      |  29 +++-
 .../server/master/runner/EventExecuteService.java  | 146 +----------------
 .../master/runner/MasterSchedulerService.java      |  52 ++----
 .../master/runner/StateWheelExecuteThread.java     | 177 ++++++++++++++------
 .../master/runner/WorkflowExecuteThread.java       | 108 ++++--------
 .../master/runner/WorkflowExecuteThreadPool.java   | 181 +++++++++++++++++++++
 .../server/master/WorkflowExecuteThreadTest.java   |   8 +-
 11 files changed, 414 insertions(+), 344 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 7d6428c..6632e48 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -345,9 +345,17 @@ public final class Constants {
      */
     public static final String DEFAULT_CRON_STRING = "0 0 0 * * ? *";
 
+    /**
+     * sleep 1000ms
+     */
     public static final int SLEEP_TIME_MILLIS = 1000;
 
     /**
+     * short sleep 100ms
+     */
+    public static final int SLEEP_TIME_MILLIS_SHORT = 100;
+
+    /**
      * one second mils
      */
     public static final int SECOND_TIME_MILLIS = 1000;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 41bc63a..11b0b79 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -34,6 +34,9 @@ import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import javax.annotation.PostConstruct;
+
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
@@ -45,8 +48,6 @@ import org.springframework.cache.annotation.EnableCaching;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 
-import javax.annotation.PostConstruct;
-
 @SpringBootApplication
 @ComponentScan("org.apache.dolphinscheduler")
 @EnableTransactionManagement
@@ -69,9 +70,6 @@ public class MasterServer implements IStoppable {
     private MasterSchedulerService masterSchedulerService;
 
     @Autowired
-    private EventExecuteService eventExecuteService;
-
-    @Autowired
     private Scheduler scheduler;
 
     @Autowired
@@ -89,6 +87,9 @@ public class MasterServer implements IStoppable {
     @Autowired
     private CacheProcessor cacheProcessor;
 
+    @Autowired
+    private EventExecuteService eventExecuteService;
+
     public static void main(String[] args) {
         Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
         SpringApplication.run(MasterServer.class);
@@ -117,11 +118,11 @@ public class MasterServer implements IStoppable {
         this.masterRegistryClient.start();
         this.masterRegistryClient.setRegistryStoppable(this);
 
-        this.eventExecuteService.init();
-        this.eventExecuteService.start();
         this.masterSchedulerService.init();
         this.masterSchedulerService.start();
 
+        this.eventExecuteService.start();
+
         this.scheduler.start();
 
         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index 1db91c6..11fc060 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -63,6 +64,9 @@ public class StateEventResponseService {
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
     @PostConstruct
     public void start() {
         this.responseWorker = new StateEventResponseWorker();
@@ -141,7 +145,7 @@ public class StateEventResponseService {
                     break;
                 default:
             }
-            workflowExecuteThread.addStateEvent(stateEvent);
+            workflowExecuteThreadPool.submitStateEvent(stateEvent);
             writeResponse(stateEvent, ExecutionStatus.SUCCESS);
         } catch (Exception e) {
             logger.error("persist event queue error, event: {}", stateEvent, e);
@@ -149,10 +153,6 @@ public class StateEventResponseService {
     }
 
     public void addEvent2WorkflowExecute(StateEvent stateEvent) {
-        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
-        workflowExecuteThread.addStateEvent(stateEvent);
-    }
-    public BlockingQueue<StateEvent> getEventQueue() {
-        return eventQueue;
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 7a0af9d..8a5755c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
@@ -74,6 +75,9 @@ public class TaskResponseService {
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
     @PostConstruct
     public void start() {
         this.taskResponseWorker = new TaskResponseWorker();
@@ -164,20 +168,16 @@ public class TaskResponseService {
                 throw new IllegalArgumentException("invalid event type : " + event);
         }
 
-        if (workflowExecuteThread != null) {
-            StateEvent stateEvent = new StateEvent();
-            stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
-            stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
-            stateEvent.setExecutionStatus(taskResponseEvent.getState());
-            stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-            workflowExecuteThread.addStateEvent(stateEvent);
-        }
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
+        stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
+        stateEvent.setExecutionStatus(taskResponseEvent.getState());
+        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
     /**
      * handle ack event
-     * @param taskResponseEvent
-     * @param taskInstance
      */
     private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
         Channel channel = taskResponseEvent.getChannel();
@@ -206,8 +206,6 @@ public class TaskResponseService {
 
     /**
      * handle result event
-     * @param taskResponseEvent
-     * @param taskInstance
      */
     private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
         Channel channel = taskResponseEvent.getChannel();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 7619535..67c8e94 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -35,9 +35,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.registry.api.ConnectionState;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -96,7 +95,7 @@ public class MasterRegistryClient {
     private ScheduledExecutorService heartBeatExecutor;
 
     @Autowired
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
     /**
      * master startup time, ms
@@ -298,6 +297,24 @@ public class MasterRegistryClient {
                     continue;
                 }
                 processInstanceCacheMap.put(processInstance.getId(), processInstance);
+                taskInstance.setProcessInstance(processInstance);
+
+                TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
+                        .buildTaskInstanceRelatedInfo(taskInstance)
+                        .buildProcessInstanceRelatedInfo(processInstance)
+                        .create();
+                // only kill yarn job if exists , the local thread has exited
+                ProcessUtils.killYarnJob(taskExecutionContext);
+
+                taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+                processService.saveTaskInstance(taskInstance);
+
+                StateEvent stateEvent = new StateEvent();
+                stateEvent.setTaskInstanceId(taskInstance.getId());
+                stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+                stateEvent.setProcessInstanceId(processInstance.getId());
+                stateEvent.setExecutionStatus(taskInstance.getState());
+                workflowExecuteThreadPool.submitStateEvent(stateEvent);
             }
 
             // only failover the task owned myself if worker down.
@@ -375,16 +392,12 @@ public class MasterRegistryClient {
         taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
         processService.saveTaskInstance(taskInstance);
 
-        WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId());
-        if (workflowExecuteThreadNotify == null) {
-            return;
-        }
         StateEvent stateEvent = new StateEvent();
         stateEvent.setTaskInstanceId(taskInstance.getId());
         stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
         stateEvent.setProcessInstanceId(processInstance.getId());
         stateEvent.setExecutionStatus(taskInstance.getState());
-        workflowExecuteThreadNotify.addStateEvent(stateEvent);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
     /**
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 3da043c..6f6f718 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -18,27 +18,9 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
-import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import org.apache.commons.lang.StringUtils;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -46,48 +28,19 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
 @Service
 public class EventExecuteService extends Thread {
 
     private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);
 
-
-    /**
-     * dolphinscheduler database interface
-     */
-    @Autowired
-    private ProcessService processService;
-
     @Autowired
-    private MasterConfig masterConfig;
-
-    private ExecutorService eventExecService;
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
     /**
-     *
+     * workflow exec service
      */
-    private StateEventCallbackService stateEventCallbackService;
-
     @Autowired
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
-
-    private ConcurrentHashMap<String, WorkflowExecuteThread> eventHandlerMap = new ConcurrentHashMap();
-    ListeningExecutorService listeningExecutorService;
-
-    public void init() {
-
-        eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getExecThreads());
-
-        listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService);
-        this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
-
-    }
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
     @Override
     public synchronized void start() {
@@ -95,20 +48,13 @@ public class EventExecuteService extends Thread {
         super.start();
     }
 
-    public void close() {
-        eventExecService.shutdown();
-        logger.info("event service stopped...");
-    }
-
     @Override
     public void run() {
         logger.info("Event service started");
         while (Stopper.isRunning()) {
             try {
                 eventHandler();
-
-                TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
-
+                TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
             } catch (Exception e) {
                 logger.error("Event service thread error", e);
             }
@@ -117,89 +63,7 @@ public class EventExecuteService extends Thread {
 
     private void eventHandler() {
         for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
-            if (workflowExecuteThread.eventSize() == 0
-                    || StringUtils.isEmpty(workflowExecuteThread.getKey())
-                    || !workflowExecuteThread.isStart()
-                    || eventHandlerMap.containsKey(workflowExecuteThread.getKey())) {
-                continue;
-            }
-            int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
-            logger.info("handle process instance : {} , events count:{}",
-                    processInstanceId,
-                    workflowExecuteThread.eventSize());
-            logger.info("already exists handler process size:{}", this.eventHandlerMap.size());
-            eventHandlerMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
-            ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread);
-            FutureCallback futureCallback = new FutureCallback() {
-                @Override
-                public void onSuccess(Object o) {
-                    if (workflowExecuteThread.workFlowFinish()) {
-                        processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
-                        notifyProcessChanged();
-                        logger.info("process instance {} finished.", processInstanceId);
-                    }
-                    if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) {
-                        processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
-                        processInstanceExecCacheManager.cache(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread);
-
-                    }
-                    eventHandlerMap.remove(workflowExecuteThread.getKey());
-                }
-
-                private void notifyProcessChanged() {
-                    if (Flag.NO == workflowExecuteThread.getProcessInstance().getIsSubProcess()) {
-                        return;
-                    }
-
-                    Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(processInstanceId);
-                    for (ProcessInstance processInstance : fatherMaps.keySet()) {
-                        String address = NetUtils.getAddr(masterConfig.getListenPort());
-                        if (processInstance.getHost().equalsIgnoreCase(address)) {
-                            notifyMyself(processInstance, fatherMaps.get(processInstance));
-                        } else {
-                            notifyProcess(processInstance, fatherMaps.get(processInstance));
-                        }
-                    }
-                }
-
-                private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
-                    logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
-                    if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
-                        return;
-                    }
-                    WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId());
-                    StateEvent stateEvent = new StateEvent();
-                    stateEvent.setTaskInstanceId(taskInstance.getId());
-                    stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-                    stateEvent.setProcessInstanceId(processInstance.getId());
-                    stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
-                    workflowExecuteThreadNotify.addStateEvent(stateEvent);
-                }
-
-                private void notifyProcess(ProcessInstance processInstance, TaskInstance taskInstance) {
-                    String host = processInstance.getHost();
-                    if (StringUtils.isEmpty(host)) {
-                        logger.info("process {} host is empty, cannot notify task {} now.",
-                                processInstance.getId(), taskInstance.getId());
-                        return;
-                    }
-                    String address = host.split(":")[0];
-                    int port = Integer.parseInt(host.split(":")[1]);
-                    logger.info("notify process {} task {} state change, host:{}",
-                            processInstance.getId(), taskInstance.getId(), host);
-                    StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
-                            processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId()
-                    );
-                    stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
-                }
-
-                @Override
-                public void onFailure(Throwable throwable) {
-                    logger.info("handle events {} failed.", processInstanceId);
-                    logger.info("handle events failed.", throwable);
-                }
-            };
-            Futures.addCallback(future, futureCallback, this.listeningExecutorService);
+            workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
         }
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index e13de47..f3cdb4b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -40,10 +39,8 @@ import org.apache.commons.collections4.CollectionUtils;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +63,10 @@ public class MasterSchedulerService extends Thread {
      */
     @Autowired
     private ProcessService processService;
+
+    /**
+     * task processor factory
+     */
     @Autowired
     private TaskProcessorFactory taskProcessorFactory;
 
@@ -95,28 +96,15 @@ public class MasterSchedulerService extends Thread {
     private ThreadPoolExecutor masterPrepareExecService;
 
     /**
-     * master exec service
+     * workflow exec service
      */
-    private ThreadPoolExecutor masterExecService;
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
-    /**
-     * process timeout check list
-     */
-    ConcurrentHashMap<Integer, ProcessInstance> processTimeoutCheckList = new ConcurrentHashMap<>();
-
-    /**
-     * task time out check list
-     */
-    ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
-
-    /**
-     * task retry check list
-     */
-    ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
-
+    @Autowired
     private StateWheelExecuteThread stateWheelExecuteThread;
 
     /**
@@ -124,15 +112,8 @@ public class MasterSchedulerService extends Thread {
      */
     public void init() {
         this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Pre-Exec-Thread", masterConfig.getPreExecThreads());
-        this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getExecThreads());
         NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
-
-        stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList,
-                taskTimeoutCheckList,
-                taskRetryCheckList,
-                this.processInstanceExecCacheManager,
-                masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
     }
 
     @Override
@@ -143,16 +124,6 @@ public class MasterSchedulerService extends Thread {
     }
 
     public void close() {
-        masterExecService.shutdown();
-        boolean terminated = false;
-        try {
-            terminated = masterExecService.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException ignore) {
-            Thread.currentThread().interrupt();
-        }
-        if (!terminated) {
-            logger.warn("masterExecService shutdown without terminated, increase await time");
-        }
         nettyRemotingClient.close();
         logger.info("master schedule service stopped...");
     }
@@ -205,15 +176,14 @@ public class MasterSchedulerService extends Thread {
                     , nettyExecutorManager
                     , processAlertManager
                     , masterConfig
-                    , taskTimeoutCheckList
-                    , taskRetryCheckList
+                    , stateWheelExecuteThread
                     , taskProcessorFactory);
 
             this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
             if (processInstance.getTimeout() > 0) {
-                this.processTimeoutCheckList.put(processInstance.getId(), processInstance);
+                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
             }
-            masterExecService.execute(workflowExecuteThread);
+            workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
         }
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index d697ab1..4502b86 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -25,47 +25,57 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 
 import org.apache.hadoop.util.ThreadUtil;
 
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 /**
  * 1. timeout check wheel
  * 2. dependent task check wheel
  */
+@Component
 public class StateWheelExecuteThread extends Thread {
 
     private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
 
-    private ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList;
-    private ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList;
-    private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+    /**
+     * process timeout check list
+     */
+    private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
 
-    private int stateCheckIntervalSecs;
-
-    public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
-                                   ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
-                                   ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
-                                   ProcessInstanceExecCacheManager processInstanceExecCacheManager,
-                                   int stateCheckIntervalSecs) {
-        this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList;
-        this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList;
-        this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
-        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
-        this.stateCheckIntervalSecs = stateCheckIntervalSecs;
-    }
+    /**
+     * task time out check list, key is taskInstanceId, value is processInstanceId
+     */
+    private ConcurrentHashMap<Integer, Integer> taskInstanceTimeoutCheckList = new ConcurrentHashMap<>();
+
+    /**
+     * task retry check list, key is taskInstanceId, value is processInstanceId
+     */
+    private ConcurrentHashMap<Integer, Integer> taskInstanceRetryCheckList = new ConcurrentHashMap<>();
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
     @Override
     public void run() {
-
-        logger.info("state wheel thread start");
         while (Stopper.isRunning()) {
             try {
                 checkTask4Timeout();
@@ -74,30 +84,83 @@ public class StateWheelExecuteThread extends Thread {
             } catch (Exception e) {
                 logger.error("state wheel thread check error:", e);
             }
-            ThreadUtil.sleepAtLeastIgnoreInterrupts(stateCheckIntervalSecs);
+            ThreadUtil.sleepAtLeastIgnoreInterrupts((long) masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
         }
     }
 
     public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
-        this.processInstanceTimeoutCheckList.put(processInstance.getId(), processInstance);
+        processInstanceTimeoutCheckList.add(processInstance.getId());
+    }
+
+    public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
+        processInstanceTimeoutCheckList.remove(processInstance.getId());
     }
 
     public void addTask4TimeoutCheck(TaskInstance taskInstance) {
-        this.taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance);
+        if (taskInstanceTimeoutCheckList.containsKey(taskInstance.getId())) {
+            return;
+        }
+        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
+        if (taskDefinition == null) {
+            logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
+            return;
+        }
+        if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
+            taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+        }
+        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
+            taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+        }
+    }
+
+    public void removeTask4TimeoutCheck(TaskInstance taskInstance) {
+        taskInstanceTimeoutCheckList.remove(taskInstance.getId());
     }
 
     public void addTask4RetryCheck(TaskInstance taskInstance) {
-        this.taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance);
+        if (taskInstanceRetryCheckList.containsKey(taskInstance.getId())) {
+            return;
+        }
+        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
+        if (taskDefinition == null) {
+            logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
+            return;
+        }
+        if (taskInstance.taskCanRetry()) {
+            taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+        }
+
+        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
+            taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
+        }
     }
 
-    public void checkTask4Timeout() {
+    public void removeTask4RetryCheck(TaskInstance taskInstance) {
+        taskInstanceRetryCheckList.remove(taskInstance.getId());
+    }
+
+    private void checkTask4Timeout() {
         if (taskInstanceTimeoutCheckList.isEmpty()) {
             return;
         }
-        for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) {
+        for (Entry<Integer, Integer> entry : taskInstanceTimeoutCheckList.entrySet()) {
+            int processInstanceId = entry.getValue();
+            int taskInstanceId = entry.getKey();
+
+            WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+            if (workflowExecuteThread == null) {
+                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
+                        processInstanceId, taskInstanceId);
+                taskInstanceTimeoutCheckList.remove(taskInstanceId);
+                continue;
+            }
+            TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+            if (taskInstance == null) {
+                continue;
+            }
             if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
-                long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
-                if (0 >= timeRemain) {
+                long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
+                if (timeRemain < 0) {
                     addTaskTimeoutEvent(taskInstance);
                     taskInstanceTimeoutCheckList.remove(taskInstance.getId());
                 }
@@ -109,8 +172,21 @@ public class StateWheelExecuteThread extends Thread {
         if (taskInstanceRetryCheckList.isEmpty()) {
             return;
         }
-
-        for (TaskInstance taskInstance : this.taskInstanceRetryCheckList.values()) {
+        for (Entry<Integer, Integer> entry : taskInstanceRetryCheckList.entrySet()) {
+            int processInstanceId = entry.getValue();
+            int taskInstanceId = entry.getKey();
+
+            WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+            if (workflowExecuteThread == null) {
+                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
+                        processInstanceId, taskInstanceId);
+                taskInstanceRetryCheckList.remove(taskInstanceId);
+                continue;
+            }
+            TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
+            if (taskInstance == null) {
+                continue;
+            }
             if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
                 addTaskStateChangeEvent(taskInstance);
                 taskInstanceRetryCheckList.remove(taskInstance.getId());
@@ -125,49 +201,50 @@ public class StateWheelExecuteThread extends Thread {
         if (processInstanceTimeoutCheckList.isEmpty()) {
             return;
         }
-        for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) {
-
-            long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
-            if (0 >= timeRemain) {
+        for (Integer processInstanceId : processInstanceTimeoutCheckList) {
+            if (processInstanceId == null) {
+                continue;
+            }
+            WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+            if (workflowExecuteThread == null) {
+                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
+                processInstanceTimeoutCheckList.remove(processInstanceId);
+                continue;
+            }
+            ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+            if (processInstance == null) {
+                continue;
+            }
+            long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
+            if (timeRemain < 0) {
                 addProcessTimeoutEvent(processInstance);
                 processInstanceTimeoutCheckList.remove(processInstance.getId());
             }
         }
     }
 
-    private boolean addTaskStateChangeEvent(TaskInstance taskInstance) {
+    private void addTaskStateChangeEvent(TaskInstance taskInstance) {
         StateEvent stateEvent = new StateEvent();
         stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
         stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
         stateEvent.setTaskInstanceId(taskInstance.getId());
         stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
-        addEvent(stateEvent);
-        return true;
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
-    private boolean addTaskTimeoutEvent(TaskInstance taskInstance) {
+    private void addTaskTimeoutEvent(TaskInstance taskInstance) {
         StateEvent stateEvent = new StateEvent();
         stateEvent.setType(StateEventType.TASK_TIMEOUT);
         stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
         stateEvent.setTaskInstanceId(taskInstance.getId());
-        addEvent(stateEvent);
-        return true;
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
-    private boolean addProcessTimeoutEvent(ProcessInstance processInstance) {
+    private void addProcessTimeoutEvent(ProcessInstance processInstance) {
         StateEvent stateEvent = new StateEvent();
         stateEvent.setType(StateEventType.PROCESS_TIMEOUT);
         stateEvent.setProcessInstanceId(processInstance.getId());
-        addEvent(stateEvent);
-        return true;
-    }
-
-    private void addEvent(StateEvent stateEvent) {
-        if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
-            return;
-        }
-        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
-        workflowExecuteThread.addStateEvent(stateEvent);
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 24d49e8..c20ddf7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -96,7 +96,7 @@ import com.google.common.collect.Lists;
 /**
  * master exec thread,split dag
  */
-public class WorkflowExecuteThread implements Runnable {
+public class WorkflowExecuteThread {
 
     /**
      * logger of WorkflowExecuteThread
@@ -204,16 +204,6 @@ public class WorkflowExecuteThread implements Runnable {
     private List<Date> complementListDate = Lists.newLinkedList();
 
     /**
-     * task timeout check list
-     */
-    private ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList;
-
-    /**
-     * task retry check list
-     */
-    private ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList;
-
-    /**
      * state event queue
      */
     private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
@@ -224,6 +214,11 @@ public class WorkflowExecuteThread implements Runnable {
     private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
 
     /**
+     * state wheel execute thread
+     */
+    private StateWheelExecuteThread stateWheelExecuteThread;
+
+    /**
      * constructor of WorkflowExecuteThread
      *
      * @param processInstance processInstance
@@ -231,7 +226,7 @@ public class WorkflowExecuteThread implements Runnable {
      * @param nettyExecutorManager nettyExecutorManager
      * @param processAlertManager processAlertManager
      * @param masterConfig masterConfig
-     * @param taskTimeoutCheckList taskTimeoutCheckList
+     * @param stateWheelExecuteThread stateWheelExecuteThread
      * @param taskProcessorFactory taskProcessorFactory
      */
     public WorkflowExecuteThread(ProcessInstance processInstance
@@ -239,32 +234,17 @@ public class WorkflowExecuteThread implements Runnable {
             , NettyExecutorManager nettyExecutorManager
             , ProcessAlertManager processAlertManager
             , MasterConfig masterConfig
-            , ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
-            , ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList
+            , StateWheelExecuteThread stateWheelExecuteThread
             , TaskProcessorFactory taskProcessorFactory) {
         this.processService = processService;
         this.processInstance = processInstance;
         this.masterConfig = masterConfig;
         this.nettyExecutorManager = nettyExecutorManager;
         this.processAlertManager = processAlertManager;
-        this.taskTimeoutCheckList = taskTimeoutCheckList;
-        this.taskRetryCheckList = taskRetryCheckList;
+        this.stateWheelExecuteThread = stateWheelExecuteThread;
         this.taskProcessorFactory = taskProcessorFactory;
     }
 
-    @Override
-    public void run() {
-        try {
-            if (!this.isStart()) {
-                startProcess();
-            } else {
-                handleEvents();
-            }
-        } catch (Exception e) {
-            logger.error("handler error:", e);
-        }
-    }
-
     /**
      * the process start nodes are submitted completely.
      */
@@ -272,9 +252,14 @@ public class WorkflowExecuteThread implements Runnable {
         return this.isStart;
     }
 
-    private void handleEvents() {
+    /**
+     * handle event
+     */
+    public void handleEvents() {
+        if (!isStart) {
+            return;
+        }
         while (!this.stateEvents.isEmpty()) {
-
             try {
                 StateEvent stateEvent = this.stateEvents.peek();
                 if (stateEventHandler(stateEvent)) {
@@ -282,7 +267,6 @@ public class WorkflowExecuteThread implements Runnable {
                 }
             } catch (Exception e) {
                 logger.error("state handle error:", e);
-
             }
         }
     }
@@ -457,8 +441,8 @@ public class WorkflowExecuteThread implements Runnable {
                         task.getRetryTimes(),
                         task.getMaxRetryTimes(),
                         task.getRetryInterval());
-                this.addTimeoutCheck(task);
-                this.addRetryCheck(task);
+                stateWheelExecuteThread.addTask4TimeoutCheck(task);
+                stateWheelExecuteThread.addTask4RetryCheck(task);
             } else {
                 submitStandByTask();
             }
@@ -467,8 +451,8 @@ public class WorkflowExecuteThread implements Runnable {
 
         completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
         activeTaskProcessorMaps.remove(task.getId());
-        taskTimeoutCheckList.remove(task.getId());
-        taskRetryCheckList.remove(task.getId());
+        stateWheelExecuteThread.removeTask4TimeoutCheck(task);
+        stateWheelExecuteThread.removeTask4RetryCheck(task);
 
         if (task.getState().typeIsSuccess()) {
             processInstance.setVarPool(task.getVarPool());
@@ -660,13 +644,21 @@ public class WorkflowExecuteThread implements Runnable {
         return false;
     }
 
-    private void startProcess() throws Exception {
-        if (this.taskInstanceMap.size() == 0) {
+    /**
+     * process start handle
+     */
+    public void startProcess() {
+        if (this.taskInstanceMap.size() > 0) {
+            return;
+        }
+        try {
             isStart = false;
             buildFlowDag();
             initTaskQueue();
             submitPostNode(null);
             isStart = true;
+        } catch (Exception e) {
+            logger.error("start process error, process instance id:{}", processInstance.getId(), e);
         }
     }
 
@@ -837,8 +829,8 @@ public class WorkflowExecuteThread implements Runnable {
             activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
             taskProcessor.run();
 
-            addTimeoutCheck(taskInstance);
-            addRetryCheck(taskInstance);
+            stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance);
+            stateWheelExecuteThread.addTask4RetryCheck(taskInstance);
 
             if (taskProcessor.taskState().typeIsFinished()) {
                 StateEvent stateEvent = new StateEvent();
@@ -871,42 +863,6 @@ public class WorkflowExecuteThread implements Runnable {
         }
     }
 
-    private void addTimeoutCheck(TaskInstance taskInstance) {
-        if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
-            return;
-        }
-        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
-        if (taskDefinition == null) {
-            logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
-            return;
-        }
-        if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
-            this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
-        }
-        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
-            this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
-        }
-    }
-
-    private void addRetryCheck(TaskInstance taskInstance) {
-        if (taskRetryCheckList.containsKey(taskInstance.getId())) {
-            return;
-        }
-        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
-        if (taskDefinition == null) {
-            logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
-            return;
-        }
-
-        if (taskInstance.taskCanRetry()) {
-            this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
-        }
-
-        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
-            this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
-        }
-    }
-
     /**
      * find task instance in db.
      * in case submit more than one same name task in the same time.
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
new file mode 100644
index 0000000..4587055
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
+import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+@Component
+public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThreadPool.class);
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private StateEventCallbackService stateEventCallbackService;
+
+    /**
+     * multi-thread filter, avoid handling workflow at the same time
+     */
+    private ConcurrentHashMap<String, WorkflowExecuteThread> multiThreadFilterMap = new ConcurrentHashMap();
+
+    @PostConstruct
+    private void init() {
+        this.setDaemon(true);
+        this.setThreadNamePrefix("Workflow-Execute-Thread-");
+        this.setMaxPoolSize(masterConfig.getExecThreads());
+        this.setCorePoolSize(masterConfig.getExecThreads());
+    }
+
+    /**
+     * submit state event
+     */
+    public void submitStateEvent(StateEvent stateEvent) {
+        WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
+        if (workflowExecuteThread == null) {
+            logger.error("workflowExecuteThread is null, processInstanceId:{}", stateEvent.getProcessInstanceId());
+            return;
+        }
+        workflowExecuteThread.addStateEvent(stateEvent);
+    }
+
+    /**
+     * start workflow
+     */
+    public void startWorkflow(WorkflowExecuteThread workflowExecuteThread) {
+        submit(workflowExecuteThread::startProcess);
+    }
+
+    /**
+     * execute workflow
+     */
+    public void executeEvent(WorkflowExecuteThread workflowExecuteThread) {
+        if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
+            return;
+        }
+        if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
+            return;
+        }
+        int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
+        ListenableFuture future = this.submitListenable(() -> {
+            workflowExecuteThread.handleEvents();
+            multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
+        });
+        future.addCallback(new ListenableFutureCallback() {
+            @Override
+            public void onFailure(Throwable ex) {
+                logger.error("handle events {} failed", processInstanceId, ex);
+                multiThreadFilterMap.remove(workflowExecuteThread.getKey());
+            }
+
+            @Override
+            public void onSuccess(Object result) {
+                if (workflowExecuteThread.workFlowFinish()) {
+                    processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
+                    notifyProcessChanged(workflowExecuteThread.getProcessInstance());
+                    logger.info("process instance {} finished.", processInstanceId);
+                }
+                multiThreadFilterMap.remove(workflowExecuteThread.getKey());
+            }
+        });
+    }
+
+    /**
+     * notify process change
+     */
+    private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
+        if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
+            return;
+        }
+        Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(finishProcessInstance.getId());
+        for (ProcessInstance processInstance : fatherMaps.keySet()) {
+            String address = NetUtils.getAddr(masterConfig.getListenPort());
+            if (processInstance.getHost().equalsIgnoreCase(address)) {
+                this.notifyMyself(processInstance, fatherMaps.get(processInstance));
+            } else {
+                this.notifyProcess(finishProcessInstance, processInstance, fatherMaps.get(processInstance));
+            }
+        }
+    }
+
+    /**
+     * notify myself
+     */
+    private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
+        logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
+        if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
+            return;
+        }
+        StateEvent stateEvent = new StateEvent();
+        stateEvent.setTaskInstanceId(taskInstance.getId());
+        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
+        stateEvent.setProcessInstanceId(processInstance.getId());
+        stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+        this.submitStateEvent(stateEvent);
+    }
+
+    /**
+     * notify process's master
+     */
+    private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance, TaskInstance taskInstance) {
+        String host = processInstance.getHost();
+        if (StringUtils.isEmpty(host)) {
+            logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId());
+            return;
+        }
+        String address = host.split(":")[0];
+        int port = Integer.parseInt(host.split(":")[1]);
+        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
+                finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId()
+        );
+        stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
+    }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index 936b70f..8db1bd9 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -84,6 +85,8 @@ public class WorkflowExecuteThreadTest {
 
     private TaskProcessorFactory taskProcessorFactory;
 
+    private StateWheelExecuteThread stateWheelExecuteThread;
+
     @Before
     public void init() throws Exception {
         processService = mock(ProcessService.class);
@@ -107,9 +110,8 @@ public class WorkflowExecuteThreadTest {
         processDefinition.setGlobalParamList(Collections.emptyList());
         Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
 
-        ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
-        ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
-        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList, taskProcessorFactory));
+        stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
+        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread, taskProcessorFactory));
         // prepareProcess init dag
         Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
         dag.setAccessible(true);