You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/01/10 10:47:31 UTC

[dolphinscheduler] branch 2.0.3-prepare updated: [dolphinscheduler-server] process instance is always running (#7914)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
     new 8dc529d  [dolphinscheduler-server] process instance is always running  (#7914)
8dc529d is described below

commit 8dc529d35f19514123902daf1ad7b0e352988d87
Author: zwZjut <zw...@163.com>
AuthorDate: Mon Jan 10 18:47:26 2022 +0800

    [dolphinscheduler-server] process instance is always running  (#7914)
    
    * #7698
    
    * #7698
    
    * #7698
    
    * fix workflow is always running
    
    * fix workflow is always running
    
    * fix workflow is always running
    
    * #7700 #7698
    
    * add license
    
    * fix ut
    
    * #7700 #7698
    
    Co-authored-by: honghuo.zw <ho...@alibaba-inc.com>
---
 .../dolphinscheduler/common/enums/Event.java       |   3 +-
 .../remote/command/TaskKillResponseCommand.java    |  14 +++
 .../server/master/MasterServer.java                |   4 +-
 .../processor/TaskKillResponseProcessor.java       |  26 ++++++
 .../master/processor/queue/TaskResponseEvent.java  |  15 ++-
 .../processor/queue/TaskResponsePersistThread.java |  12 +++
 .../processor/queue/TaskResponseService.java       |  12 ++-
 .../server/master/runner/MasterExecService.java    | 102 +++++++++++++++++++++
 .../master/runner/MasterSchedulerService.java      |  23 ++++-
 .../master/runner/StateWheelExecuteThread.java     |  36 ++++++--
 .../master/runner/WorkflowExecuteThread.java       |  27 ++++--
 .../master/runner/task/BaseTaskProcessor.java      |  17 ++++
 .../master/runner/task/CommonTaskProcessor.java    |  19 +++-
 .../master/runner/task/ConditionTaskProcessor.java |  18 +++-
 .../master/runner/task/DependentTaskProcessor.java |  20 +++-
 .../server/master/runner/task/ITaskProcessor.java  |   2 +
 .../master/runner/task/SubTaskProcessor.java       |  11 +++
 .../master/runner/task/SwitchTaskProcessor.java    |  19 +++-
 .../server/worker/processor/TaskKillProcessor.java |  11 +++
 .../server/worker/runner/TaskExecuteThread.java    |   4 +
 .../server/worker/runner/WorkerExecService.java    |  85 +++++++++++++++++
 .../server/worker/runner/WorkerManagerThread.java  |  21 ++++-
 .../server/master/WorkflowExecuteThreadTest.java   |   2 +-
 .../processor/TaskKillResponseProcessorTest.java   |  13 +++
 24 files changed, 483 insertions(+), 33 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
index 9cec276..5adfe63 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
@@ -19,5 +19,6 @@ package org.apache.dolphinscheduler.common.enums;
 
 public enum Event {
     ACK,
-    RESULT;
+    RESULT,
+    ACTION_STOP;
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
index 03ad4dd..98fbe0f 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
@@ -49,6 +49,11 @@ public class TaskKillResponseCommand implements Serializable {
     private int processId;
 
     /**
+     * process instance id
+     */
+    private int processInstanceId;
+
+    /**
      * other resource manager appId , for example : YARN etc
      */
     private List<String> appIds;
@@ -85,6 +90,14 @@ public class TaskKillResponseCommand implements Serializable {
         this.processId = processId;
     }
 
+    public int getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public void setProcessInstanceId(int processInstanceId) {
+        this.processInstanceId = processInstanceId;
+    }
+
     public List<String> getAppIds() {
         return appIds;
     }
@@ -114,6 +127,7 @@ public class TaskKillResponseCommand implements Serializable {
                 + ", status=" + status
                 + ", processId=" + processId
                 + ", appIds=" + appIds
+                + ", processInstanceId=" + processInstanceId
                 + '}';
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index a5d7cf4..a6642a1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -138,11 +138,13 @@ public class MasterServer implements IStoppable {
         ackProcessor.init(processInstanceExecMaps);
         TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor();
         taskResponseProcessor.init(processInstanceExecMaps);
+        TaskKillResponseProcessor taskKillResponseProcessor = new TaskKillResponseProcessor();
+        taskKillResponseProcessor.init(processInstanceExecMaps);
         StateEventProcessor stateEventProcessor = new StateEventProcessor();
         stateEventProcessor.init(processInstanceExecMaps);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
         this.nettyRemotingServer.start();
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 28f18fe..36dde29 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -17,11 +17,18 @@
 
 package org.apache.dolphinscheduler.server.master.processor;
 
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +45,19 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
     private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class);
 
     /**
+     * process service
+     */
+    private final TaskResponseService taskResponseService;
+
+    public TaskKillResponseProcessor() {
+        this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
+    }
+
+    public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
+        this.taskResponseService.init(processInstanceExecMaps);
+    }
+
+    /**
      * task final result response
      * need master process , state persistence
      *
@@ -50,6 +70,12 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
 
         TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class);
         logger.info("received task kill response command : {}", responseCommand);
+        // TaskResponseEvent
+        TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(ExecutionStatus.of(responseCommand.getStatus()),
+                responseCommand.getTaskInstanceId(),
+                responseCommand.getProcessInstanceId()
+        );
+        taskResponseService.addResponse(taskResponseEvent);
     }
 
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 224a617..f2a080c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -17,13 +17,13 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
-import com.fasterxml.jackson.annotation.JsonFormat;
-
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 
 import java.util.Date;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
+
 import io.netty.channel.Channel;
 
 /**
@@ -94,6 +94,17 @@ public class TaskResponseEvent {
     private Channel channel;
 
     private int processInstanceId;
+
+    public static TaskResponseEvent newActionStop(ExecutionStatus state,
+                                                  int taskInstanceId,
+                                                  int processInstanceId) {
+        TaskResponseEvent event = new TaskResponseEvent();
+        event.setState(state);
+        event.setTaskInstanceId(taskInstanceId);
+        event.setEvent(Event.ACTION_STOP);
+        event.setProcessInstanceId(processInstanceId);
+        return event;
+    }
     
     public static TaskResponseEvent newAck(ExecutionStatus state,
                                            Date startTime,
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
index 621dd79..ca8ad0a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -146,6 +148,16 @@ public class TaskResponsePersistThread implements Runnable {
                     channel.writeAndFlush(taskResponseCommand.convert2Command());
                 }
                 break;
+            case ACTION_STOP:
+                WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
+                if (workflowExecuteThread != null) {
+                    ITaskProcessor taskProcessor = workflowExecuteThread.getActiveTaskProcessorMaps().get(taskResponseEvent.getTaskInstanceId());
+                    if (taskProcessor != null) {
+                        taskProcessor.persist(TaskAction.STOP);
+                        logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId());
+                    }
+                }
+                break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + event);
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 5ef2350..b5e70ee 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -115,10 +115,20 @@ public class TaskResponseService {
         try {
             this.taskResponseWorker.interrupt();
             this.taskResponseEventHandler.interrupt();
-            this.eventExecService.shutdown();
         } catch (Exception e) {
             logger.error("stop error:", e);
         }
+        this.eventExecService.shutdown();
+        long waitSec = 5;
+        boolean terminated = false;
+        try {
+            terminated = eventExecService.awaitTermination(waitSec, TimeUnit.SECONDS);
+        } catch (InterruptedException ignore) {
+            Thread.currentThread().interrupt();
+        }
+        if (!terminated) {
+            logger.warn("TaskResponseService: eventExecService shutdown without terminated: {}s, increase await time", waitSec);
+        }
     }
 
     /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java
new file mode 100644
index 0000000..edb5e66
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class MasterExecService {
+
+    /**
+     * logger of MasterExecService
+     */
+    private static final Logger logger = LoggerFactory.getLogger(MasterExecService.class);
+
+    /**
+     * master exec service
+     */
+    private final ThreadPoolExecutor execService;
+
+    private final ListeningExecutorService listeningExecutorService;
+
+    /**
+     * start process failed map
+     */
+    private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap;
+
+    private final ConcurrentHashMap<Integer, WorkflowExecuteThread> filterMap = new ConcurrentHashMap<>();
+
+    public MasterExecService(ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap,ThreadPoolExecutor execService) {
+        this.startProcessFailedMap = startProcessFailedMap;
+        this.execService = execService;
+        this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
+    }
+
+    public void execute(WorkflowExecuteThread workflowExecuteThread) {
+        if (workflowExecuteThread == null
+                || workflowExecuteThread.getProcessInstance() == null
+                || workflowExecuteThread.isStart()
+                || filterMap.containsKey(workflowExecuteThread.getProcessInstance().getId())) {
+            return;
+        }
+        Integer processInstanceId = workflowExecuteThread.getProcessInstance().getId();
+        filterMap.put(processInstanceId, workflowExecuteThread);
+        ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread);
+        FutureCallback futureCallback = new FutureCallback() {
+            @Override
+            public void onSuccess(Object o) {
+                if (!workflowExecuteThread.isStart()) {
+                    startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread);
+                } else {
+                    startProcessFailedMap.remove(processInstanceId);
+                }
+                filterMap.remove(processInstanceId);
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                logger.error("handle events {} failed", processInstanceId, throwable);
+                if (!workflowExecuteThread.isStart()) {
+                    startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread);
+                } else {
+                    startProcessFailedMap.remove(processInstanceId);
+                }
+                filterMap.remove(processInstanceId);
+            }
+        };
+        Futures.addCallback(future, futureCallback, this.listeningExecutorService);
+    }
+
+    public void shutdown() {
+        this.execService.shutdown();
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        return this.execService.awaitTermination(timeout, unit);
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 33c84b3..79cce4f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -58,6 +59,12 @@ public class MasterSchedulerService extends Thread {
     private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
 
     /**
+     * handle task event
+     */
+    @Autowired
+    private TaskResponseService taskResponseService;
+
+    /**
      * dolphinscheduler database interface
      */
     @Autowired
@@ -92,7 +99,12 @@ public class MasterSchedulerService extends Thread {
     /**
      * master exec service
      */
-    private ThreadPoolExecutor masterExecService;
+    private MasterExecService masterExecService;
+
+    /**
+     * start process failed map
+     */
+    private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap = new ConcurrentHashMap<>();
 
     /**
      * process instance execution list
@@ -126,11 +138,15 @@ public class MasterSchedulerService extends Thread {
      */
     public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
         this.processInstanceExecMaps = processInstanceExecMaps;
-        this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
+        this.masterExecService = new MasterExecService(this.startProcessFailedMap,
+                (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()));
         NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
 
-        stateWheelExecuteThread = new StateWheelExecuteThread(processService,
+        stateWheelExecuteThread = new StateWheelExecuteThread(
+                masterExecService,
+                processService,
+                startProcessFailedMap,
                 processTimeoutCheckList,
                 taskTimeoutCheckList,
                 taskRetryCheckList,
@@ -202,6 +218,7 @@ public class MasterSchedulerService extends Thread {
                 if (processInstance != null) {
                     WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
                             processInstance
+                            , taskResponseService
                             , processService
                             , nettyExecutorManager
                             , processAlertManager
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index de6db1d..e12be0c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -49,15 +49,30 @@ public class StateWheelExecuteThread extends Thread {
     private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
     private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
 
+    /**
+     * start process failed map
+     */
+    private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap;
+
     private int stateCheckIntervalSecs;
 
-    public StateWheelExecuteThread(ProcessService processService,
-                                   ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
-                                   ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
-                                   ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
-                                   ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
-                                   int stateCheckIntervalSecs) {
+    /**
+     * master exec service
+     */
+    private MasterExecService masterExecService;
+
+    public StateWheelExecuteThread(
+            MasterExecService masterExecService,
+            ProcessService processService,
+            ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap,
+            ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
+            ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
+            ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
+            ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
+            int stateCheckIntervalSecs) {
+        this.masterExecService = masterExecService;
         this.processService = processService;
+        this.startProcessFailedMap = startProcessFailedMap;
         this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList;
         this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList;
         this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
@@ -71,6 +86,7 @@ public class StateWheelExecuteThread extends Thread {
         logger.info("state wheel thread start");
         while (Stopper.isRunning()) {
             try {
+                check4StartProcessFailed();
                 checkTask4Timeout();
                 checkTask4Retry();
                 checkProcess4Timeout();
@@ -176,4 +192,12 @@ public class StateWheelExecuteThread extends Thread {
         workflowExecuteThread.addStateEvent(stateEvent);
     }
 
+    private void check4StartProcessFailed() {
+        if (startProcessFailedMap.isEmpty()) {
+            return;
+        }
+        for (WorkflowExecuteThread workflowExecuteThread : this.startProcessFailedMap.values()) {
+            masterExecService.execute(workflowExecuteThread);
+        }
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 92862ae..5988839 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -57,6 +57,8 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
@@ -104,6 +106,11 @@ public class WorkflowExecuteThread implements Runnable {
     private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
 
     /**
+     * handle task event
+     */
+    private TaskResponseService taskResponseService;
+
+    /**
      * process instance
      */
     private ProcessInstance processInstance;
@@ -205,6 +212,7 @@ public class WorkflowExecuteThread implements Runnable {
      * @param nettyExecutorManager nettyExecutorManager
      */
     public WorkflowExecuteThread(ProcessInstance processInstance
+            , TaskResponseService taskResponseService
             , ProcessService processService
             , NettyExecutorManager nettyExecutorManager
             , ProcessAlertManager processAlertManager
@@ -212,7 +220,7 @@ public class WorkflowExecuteThread implements Runnable {
             , ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
             , ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList) {
         this.processService = processService;
-
+        this.taskResponseService = taskResponseService;
         this.processInstance = processInstance;
         this.masterConfig = masterConfig;
         this.nettyExecutorManager = nettyExecutorManager;
@@ -1249,13 +1257,12 @@ public class WorkflowExecuteThread implements Runnable {
             }
             ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId);
             taskProcessor.action(TaskAction.STOP);
-            if (taskProcessor.taskState().typeIsFinished()) {
-                StateEvent stateEvent = new StateEvent();
-                stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-                stateEvent.setProcessInstanceId(this.processInstance.getId());
-                stateEvent.setTaskInstanceId(taskInstance.getId());
-                stateEvent.setExecutionStatus(taskProcessor.taskState());
-                this.addStateEvent(stateEvent);
+            if (taskProcessor != null && taskProcessor.taskState().typeIsFinished()) {
+                TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(
+                        taskProcessor.taskState(),
+                        taskInstance.getId(),
+                        this.processInstance.getId());
+                taskResponseService.addResponse(taskResponseEvent);
             }
         }
     }
@@ -1420,4 +1427,8 @@ public class WorkflowExecuteThread implements Runnable {
                                       TaskDependType depNodeType) throws Exception {
         return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
     }
+
+    public Map<Integer, ITaskProcessor> getActiveTaskProcessorMaps() {
+        return activeTaskProcessorMaps;
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 5532477..4446485 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -83,6 +83,13 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
     protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
 
     /**
+     * persist task
+     *
+     * @return
+     */
+    protected abstract boolean persistTask(TaskAction taskAction);
+
+    /**
      * pause task, common tasks donot need this.
      *
      * @return
@@ -102,6 +109,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
      */
     protected abstract boolean taskTimeout();
 
+    /**
+     * persist
+     *
+     * @return
+     */
+    @Override
+    public boolean persist(TaskAction taskAction) {
+        return persistTask(taskAction);
+    }
+
     @Override
     public void run() {
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 23988f9..14bb3af 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -87,6 +87,24 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean persistTask(TaskAction taskAction) {
+        switch (taskAction) {
+            case STOP:
+                if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
+                    return true;
+                }
+                taskInstance.setState(ExecutionStatus.KILL);
+                taskInstance.setEndTime(new Date());
+                processService.updateTaskInstance(taskInstance);
+                return true;
+            default:
+                logger.error("unknown task action: {}", taskAction.toString());
+
+        }
+        return false;
+    }
+
     /**
      * common task cannot be paused
      */
@@ -154,7 +172,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
             if (StringUtils.isBlank(taskInstance.getHost())) {
                 taskInstance.setState(ExecutionStatus.KILL);
                 taskInstance.setEndTime(new Date());
-                processService.updateTaskInstance(taskInstance);
                 return true;
             }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 7c593b0..584e484 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -134,10 +134,26 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
+    protected boolean persistTask(TaskAction taskAction) {
+        switch (taskAction) {
+            case STOP:
+                if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
+                    return true;
+                }
+                this.taskInstance.setState(ExecutionStatus.KILL);
+                this.taskInstance.setEndTime(new Date());
+                processService.saveTaskInstance(taskInstance);
+                return true;
+            default:
+                logger.error("unknown task action: {}", taskAction.toString());
+        }
+        return false;
+    }
+
+    @Override
     protected boolean killTask() {
         this.taskInstance.setState(ExecutionStatus.KILL);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
         return true;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index b26e641..0f84a5f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.DependentExecute;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -156,10 +155,27 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
+    protected boolean persistTask(TaskAction taskAction) {
+        switch (taskAction) {
+            case STOP:
+                if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
+                    return true;
+                }
+                this.taskInstance.setState(ExecutionStatus.KILL);
+                this.taskInstance.setEndTime(new Date());
+                processService.saveTaskInstance(taskInstance);
+                return true;
+            default:
+                logger.error("unknown task action: {}", taskAction.toString());
+
+        }
+        return false;
+    }
+
+    @Override
     protected boolean killTask() {
         this.taskInstance.setState(ExecutionStatus.KILL);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
         return true;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index b68dc22..aa1e490 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
  */
 public interface ITaskProcessor {
 
+    boolean persist(TaskAction taskAction);
+
     void run();
 
     boolean action(TaskAction taskAction);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index e0cd3e8..02f08a8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -114,6 +114,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
+    protected boolean persistTask(TaskAction taskAction) {
+        switch (taskAction) {
+            case STOP:
+                return true;
+            default:
+                logger.error("unknown task action: {}", taskAction.toString());
+        }
+        return false;
+    }
+
+    @Override
     protected boolean pauseTask() {
         pauseSubWorkFlow();
         return true;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 8e9316f..c48a711 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -98,6 +98,24 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     }
 
     @Override
+    protected boolean persistTask(TaskAction taskAction) {
+        switch (taskAction) {
+            case STOP:
+                if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
+                    return true;
+                }
+                this.taskInstance.setState(ExecutionStatus.KILL);
+                this.taskInstance.setEndTime(new Date());
+                processService.saveTaskInstance(taskInstance);
+                return true;
+            default:
+                logger.error("unknown task action: {}", taskAction.toString());
+
+        }
+        return false;
+    }
+
+    @Override
     protected boolean pauseTask() {
         this.taskInstance.setState(ExecutionStatus.PAUSE);
         this.taskInstance.setEndTime(new Date());
@@ -109,7 +127,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     protected boolean killTask() {
         this.taskInstance.setState(ExecutionStatus.KILL);
         this.taskInstance.setEndTime(new Date());
-        processService.saveTaskInstance(taskInstance);
         return true;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 4341b8c..4f235ea 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -32,10 +32,12 @@ import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.AbstractTask;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
@@ -118,6 +120,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         try {
             Integer processId = taskExecutionContext.getProcessId();
             if (processId.equals(0)) {
+                TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
+                if (null != taskExecuteThread) {
+                    AbstractTask task = taskExecuteThread.getTask();
+                    if (task != null) {
+                        task.cancelApplication(true);
+                        logger.info("kill task by cancelApplication, task id:{}", taskInstanceId);
+                    }
+                }
                 workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
                 TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
                 logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
@@ -165,6 +175,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
             taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
             taskKillResponseCommand.setHost(taskExecutionContext.getHost());
             taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+            taskKillResponseCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
         }
         return taskKillResponseCommand;
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 0b18dcf..4b9f99c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -389,4 +389,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
         }
         taskExecutionContext.setParamsMap(paramsMap);
     }
+
+    public AbstractTask getTask() {
+        return task;
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
new file mode 100644
index 0000000..b980246
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class WorkerExecService {
+    /**
+     * logger of WorkerExecService
+     */
+    private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class);
+
+    private final ListeningExecutorService listeningExecutorService;
+
+    /**
+     * thread executor service
+     */
+    private final ExecutorService execService;
+
+    /**
+     * running task
+     */
+    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
+
+    public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
+        this.execService = execService;
+        this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
+        this.taskExecuteThreadMap = taskExecuteThreadMap;
+    }
+
+    public void submit(TaskExecuteThread taskExecuteThread) {
+        taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
+        ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
+        FutureCallback futureCallback = new FutureCallback() {
+            @Override
+            public void onSuccess(Object o) {
+                taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
+                        , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
+                taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
+            }
+        };
+        Futures.addCallback(future, futureCallback, this.listeningExecutorService);
+    }
+
+    /**
+     * get thread pool queue size
+     *
+     * @return queue size
+     */
+    public int getThreadPoolQueueSize() {
+        return ((ThreadPoolExecutor) this.execService).getQueue().size();
+    }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 4f68166..0deab9e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -31,9 +31,8 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +52,11 @@ public class WorkerManagerThread implements Runnable {
     private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
 
     /**
+     * running task
+     */
+    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+
+    /**
      * worker config
      */
     private final WorkerConfig workerConfig;
@@ -60,7 +64,7 @@ public class WorkerManagerThread implements Runnable {
     /**
      * thread executor service
      */
-    private final ExecutorService workerExecService;
+    private final WorkerExecService workerExecService;
 
     /**
      * task callback service
@@ -69,10 +73,17 @@ public class WorkerManagerThread implements Runnable {
 
     public WorkerManagerThread() {
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
-        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads());
+        this.workerExecService = new WorkerExecService(
+                ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()),
+                taskExecuteThreadMap
+        );
         this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
     }
 
+    public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
+        return this.taskExecuteThreadMap.get(taskInstanceId);
+    }
+
     /**
      * get delay queue size
      *
@@ -88,7 +99,7 @@ public class WorkerManagerThread implements Runnable {
      * @return queue size
      */
     public int getThreadPoolQueueSize() {
-        return ((ThreadPoolExecutor) workerExecService).getQueue().size();
+        return this.workerExecService.getThreadPoolQueueSize();
     }
 
     /**
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index bf527d2..35520c5 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -102,7 +102,7 @@ public class WorkflowExecuteThreadTest {
 
         ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
         ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
-        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList));
+        workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, null,processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList));
         // prepareProcess init dag
         Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
         dag.setAccessible(true);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
index c7f0475..8bef045 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
@@ -20,19 +20,26 @@ package org.apache.dolphinscheduler.server.master.processor;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.ArrayList;
 
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import io.netty.channel.Channel;
 
 /**
  *  task response processor test
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class})
 public class TaskKillResponseProcessorTest {
 
     private TaskKillResponseProcessor taskKillResponseProcessor;
@@ -41,8 +48,14 @@ public class TaskKillResponseProcessorTest {
 
     private Channel channel;
 
+    private TaskResponseService taskResponseService;
+
     @Before
     public void before() {
+        PowerMockito.mockStatic(SpringApplicationContext.class);
+
+        taskResponseService = PowerMockito.mock(TaskResponseService.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService);
         taskKillResponseProcessor = new TaskKillResponseProcessor();
         channel = PowerMockito.mock(Channel.class);
         taskKillResponseCommand = new TaskKillResponseCommand();