You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/03/31 06:13:38 UTC

[dolphinscheduler] branch dev updated: [Improvement-9288][Master] add task event thread pool (#9293)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 52ba2c6  [Improvement-9288][Master] add task event thread pool (#9293)
52ba2c6 is described below

commit 52ba2c6475042ad2bd65d2987b7a2e2d23ddd8fa
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Mar 31 14:13:33 2022 +0800

    [Improvement-9288][Master] add task event thread pool (#9293)
    
    * add task event thread
    
    * license heander
    
    * ci test
    
    * delete unuse file
    
    * fix CI test
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../master/processor/queue/TaskEventService.java   | 193 ++++-----------------
 ...askEventService.java => TaskExecuteThread.java} | 131 +++++---------
 .../processor/queue/TaskExecuteThreadPool.java     | 138 +++++++++++++++
 .../processor/queue/TaskResponseServiceTest.java   |   5 +-
 4 files changed, 220 insertions(+), 247 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index 4984390..924dd13 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -17,24 +17,14 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
-import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -44,8 +34,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import io.netty.channel.Channel;
-
 /**
  * task manager
  */
@@ -63,45 +51,38 @@ public class TaskEventService {
     private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<>();
 
     /**
-     * process service
-     */
-    @Autowired
-    private ProcessService processService;
-
-    /**
-     * data quality result operator
-     */
-    @Autowired
-    private DataQualityResultOperator dataQualityResultOperator;
-
-    /**
      * task event worker
      */
-    private Thread taskEventWorker;
+    private Thread taskEventThread;
 
-    @Autowired
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+    private Thread taskEventHandlerThread;
 
     @Autowired
-    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+    private TaskExecuteThreadPool taskExecuteThreadPool;
 
     @PostConstruct
     public void start() {
-        this.taskEventWorker = new TaskEventWorker();
-        this.taskEventWorker.setName("TaskStateEventWorker");
-        this.taskEventWorker.start();
+        this.taskEventThread = new TaskEventThread();
+        this.taskEventThread.setName("TaskEventThread");
+        this.taskEventThread.start();
+
+        this.taskEventHandlerThread = new TaskEventHandlerThread();
+        this.taskEventHandlerThread.setName("TaskEventHandlerThread");
+        this.taskEventHandlerThread.start();
     }
 
     @PreDestroy
     public void stop() {
         try {
-            this.taskEventWorker.interrupt();
+            this.taskEventThread.interrupt();
+            this.taskEventHandlerThread.interrupt();
             if (!eventQueue.isEmpty()) {
                 List<TaskEvent> remainEvents = new ArrayList<>(eventQueue.size());
                 eventQueue.drainTo(remainEvents);
-                for (TaskEvent event : remainEvents) {
-                    this.persist(event);
+                for (TaskEvent taskEvent : remainEvents) {
+                    taskExecuteThreadPool.submitTaskEvent(taskEvent);
                 }
+                taskExecuteThreadPool.eventHandler();
             }
         } catch (Exception e) {
             logger.error("stop error:", e);
@@ -109,32 +90,25 @@ public class TaskEventService {
     }
 
     /**
-     * add event to queue
+     * add event
      *
      * @param taskEvent taskEvent
      */
     public void addEvent(TaskEvent taskEvent) {
-        try {
-            eventQueue.put(taskEvent);
-        } catch (InterruptedException e) {
-            logger.error("add task event : {} error :{}", taskEvent, e);
-            Thread.currentThread().interrupt();
-        }
+        taskExecuteThreadPool.submitTaskEvent(taskEvent);
     }
 
     /**
      * task worker thread
      */
-    class TaskEventWorker extends Thread {
-
+    class TaskEventThread extends Thread {
         @Override
         public void run() {
-
             while (Stopper.isRunning()) {
                 try {
                     // if not task , blocking here
                     TaskEvent taskEvent = eventQueue.take();
-                    persist(taskEvent);
+                    taskExecuteThreadPool.submitTaskEvent(taskEvent);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     break;
@@ -147,121 +121,24 @@ public class TaskEventService {
     }
 
     /**
-     * persist task event
-     *
-     * @param taskEvent taskEvent
-     */
-    private void persist(TaskEvent taskEvent) {
-        Event event = taskEvent.getEvent();
-        int taskInstanceId = taskEvent.getTaskInstanceId();
-        int processInstanceId = taskEvent.getProcessInstanceId();
-
-        TaskInstance taskInstance;
-        WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
-        if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
-            taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
-        } else {
-            taskInstance = processService.findTaskInstanceById(taskInstanceId);
-        }
-
-        switch (event) {
-            case DISPATCH:
-                handleDispatchEvent(taskEvent, taskInstance);
-                // dispatch event do not need to submit state event
-                return;
-            case DELAY:
-            case RUNNING:
-                handleRunningEvent(taskEvent, taskInstance);
-                break;
-            case RESULT:
-                handleResultEvent(taskEvent, taskInstance);
-                break;
-            default:
-                throw new IllegalArgumentException("invalid event type : " + event);
-        }
-
-        StateEvent stateEvent = new StateEvent();
-        stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
-        stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
-        stateEvent.setExecutionStatus(taskEvent.getState());
-        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-        workflowExecuteThreadPool.submitStateEvent(stateEvent);
-    }
-
-    /**
-     * handle dispatch event
+     * event handler thread
      */
-    private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
-        if (taskInstance == null) {
-            logger.error("taskInstance is null");
-            return;
-        }
-        if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
-            return;
-        }
-        taskInstance.setState(ExecutionStatus.DISPATCH);
-        taskInstance.setHost(taskEvent.getWorkerAddress());
-        processService.saveTaskInstance(taskInstance);
-    }
+    class TaskEventHandlerThread extends Thread {
 
-    /**
-     * handle running event
-     */
-    private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
-        Channel channel = taskEvent.getChannel();
-        try {
-            if (taskInstance != null) {
-                if (taskInstance.getState().typeIsFinished()) {
-                    logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
-                } else {
-                    taskInstance.setState(taskEvent.getState());
-                    taskInstance.setStartTime(taskEvent.getStartTime());
-                    taskInstance.setHost(taskEvent.getWorkerAddress());
-                    taskInstance.setLogPath(taskEvent.getLogPath());
-                    taskInstance.setExecutePath(taskEvent.getExecutePath());
-                    taskInstance.setPid(taskEvent.getProcessId());
-                    taskInstance.setAppLink(taskEvent.getAppIds());
-                    processService.saveTaskInstance(taskInstance);
+        @Override
+        public void run() {
+            logger.info("event handler thread started");
+            while (Stopper.isRunning()) {
+                try {
+                    taskExecuteThreadPool.eventHandler();
+                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    break;
+                } catch (Exception e) {
+                    logger.error("event handler thread error", e);
                 }
             }
-            // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
-            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
-        } catch (Exception e) {
-            logger.error("worker ack master error", e);
-            TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-            channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
-        }
-    }
-
-    /**
-     * handle result event
-     */
-    private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
-        Channel channel = taskEvent.getChannel();
-        try {
-            if (taskInstance != null) {
-                dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
-
-                taskInstance.setStartTime(taskEvent.getStartTime());
-                taskInstance.setHost(taskEvent.getWorkerAddress());
-                taskInstance.setLogPath(taskEvent.getLogPath());
-                taskInstance.setExecutePath(taskEvent.getExecutePath());
-                taskInstance.setPid(taskEvent.getProcessId());
-                taskInstance.setAppLink(taskEvent.getAppIds());
-                taskInstance.setState(taskEvent.getState());
-                taskInstance.setEndTime(taskEvent.getEndTime());
-                taskInstance.setVarPool(taskEvent.getVarPool());
-                processService.changeOutParam(taskInstance);
-                processService.saveTaskInstance(taskInstance);
-            }
-            // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
-            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
-            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
-        } catch (Exception e) {
-            logger.error("worker response master error", e);
-            TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
-            channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         }
     }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
similarity index 75%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
index 4984390..47b190e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
@@ -31,119 +30,77 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPoo
 import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 
 import io.netty.channel.Channel;
 
 /**
- * task manager
+ * task execute thread
  */
-@Component
-public class TaskEventService {
+public class TaskExecuteThread {
 
-    /**
-     * logger
-     */
-    private final Logger logger = LoggerFactory.getLogger(TaskEventService.class);
+    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
 
-    /**
-     * attemptQueue
-     */
-    private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<>();
+    private final int processInstanceId;
 
-    /**
-     * process service
-     */
-    @Autowired
-    private ProcessService processService;
+    private final ConcurrentLinkedQueue<TaskEvent> events = new ConcurrentLinkedQueue<>();
 
-    /**
-     * data quality result operator
-     */
-    @Autowired
-    private DataQualityResultOperator dataQualityResultOperator;
+    private ProcessService processService;
 
-    /**
-     * task event worker
-     */
-    private Thread taskEventWorker;
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
-    @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
-    @Autowired
-    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+    private DataQualityResultOperator dataQualityResultOperator;
 
-    @PostConstruct
-    public void start() {
-        this.taskEventWorker = new TaskEventWorker();
-        this.taskEventWorker.setName("TaskStateEventWorker");
-        this.taskEventWorker.start();
+    public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
+                             ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
+        this.processInstanceId = processInstanceId;
+        this.processService = processService;
+        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+        this.processInstanceExecCacheManager = processInstanceExecCacheManager;
+        this.dataQualityResultOperator = dataQualityResultOperator;
     }
 
-    @PreDestroy
-    public void stop() {
-        try {
-            this.taskEventWorker.interrupt();
-            if (!eventQueue.isEmpty()) {
-                List<TaskEvent> remainEvents = new ArrayList<>(eventQueue.size());
-                eventQueue.drainTo(remainEvents);
-                for (TaskEvent event : remainEvents) {
-                    this.persist(event);
-                }
+    public void run() {
+        while (!this.events.isEmpty()) {
+            TaskEvent event = this.events.peek();
+            try {
+                persist(event);
+            } catch (Exception e) {
+                logger.error("persist error, event:{}, error: {}", event, e);
+            } finally {
+                this.events.remove(event);
             }
-        } catch (Exception e) {
-            logger.error("stop error:", e);
         }
     }
 
-    /**
-     * add event to queue
-     *
-     * @param taskEvent taskEvent
-     */
-    public void addEvent(TaskEvent taskEvent) {
-        try {
-            eventQueue.put(taskEvent);
-        } catch (InterruptedException e) {
-            logger.error("add task event : {} error :{}", taskEvent, e);
-            Thread.currentThread().interrupt();
-        }
+    public String getKey() {
+        return String.valueOf(processInstanceId);
     }
 
-    /**
-     * task worker thread
-     */
-    class TaskEventWorker extends Thread {
+    public int eventSize() {
+        return this.events.size();
+    }
+
+    public boolean isEmpty() {
+        return this.events.isEmpty();
+    }
 
-        @Override
-        public void run() {
+    public Integer getProcessInstanceId() {
+        return processInstanceId;
+    }
 
-            while (Stopper.isRunning()) {
-                try {
-                    // if not task , blocking here
-                    TaskEvent taskEvent = eventQueue.take();
-                    persist(taskEvent);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    break;
-                } catch (Exception e) {
-                    logger.error("persist task error", e);
-                }
-            }
-            logger.info("StateEventResponseWorker stopped");
+    public boolean addEvent(TaskEvent event) {
+        if (event.getProcessInstanceId() != this.processInstanceId) {
+            logger.warn("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}",
+                    event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId);
+            return false;
         }
+        return this.events.add(event);
     }
 
     /**
@@ -264,4 +221,4 @@ public class TaskEventService {
             channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
new file mode 100644
index 0000000..ccdab1b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+@Component
+public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class);
+
+    private final ConcurrentHashMap<String, TaskExecuteThread> multiThreadFilterMap = new ConcurrentHashMap<>();
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    /**
+     * process service
+     */
+    @Autowired
+    private ProcessService processService;
+
+    /**
+     * data quality result operator
+     */
+    @Autowired
+    private DataQualityResultOperator dataQualityResultOperator;
+
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    /**
+     * task event thread map
+     */
+    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+
+    @PostConstruct
+    private void init() {
+        this.setDaemon(true);
+        this.setThreadNamePrefix("Task-Execute-Thread-");
+        this.setMaxPoolSize(masterConfig.getExecThreads());
+        this.setCorePoolSize(masterConfig.getExecThreads());
+    }
+
+    public void submitTaskEvent(TaskEvent taskEvent) {
+        if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
+            logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
+            return;
+        }
+        if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
+            TaskExecuteThread taskExecuteThread = new TaskExecuteThread(
+                    taskEvent.getProcessInstanceId(),
+                    processService, workflowExecuteThreadPool,
+                    processInstanceExecCacheManager,
+                    dataQualityResultOperator);
+            taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
+        }
+        TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
+        if (taskExecuteThread != null) {
+            taskExecuteThread.addEvent(taskEvent);
+        }
+    }
+
+    public void eventHandler() {
+        for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) {
+            executeEvent(taskExecuteThread);
+        }
+    }
+
+    public void executeEvent(TaskExecuteThread taskExecuteThread) {
+        if (taskExecuteThread.eventSize() == 0) {
+            return;
+        }
+        if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
+            return;
+        }
+        ListenableFuture future = this.submitListenable(() -> {
+            taskExecuteThread.run();
+            multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
+        });
+        future.addCallback(new ListenableFutureCallback() {
+            @Override
+            public void onFailure(Throwable ex) {
+                logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex);
+                if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
+                    taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
+                    logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+                }
+                multiThreadFilterMap.remove(taskExecuteThread.getKey());
+            }
+
+            @Override
+            public void onSuccess(Object result) {
+                logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId());
+                if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
+                    taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
+                    logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+                }
+                multiThreadFilterMap.remove(taskExecuteThread.getKey());
+            }
+        });
+    }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 9a054b6..c5b00db 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -67,6 +67,9 @@ public class TaskResponseServiceTest {
     @Mock
     private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
+    @Mock
+    private TaskExecuteThreadPool taskExecuteThreadPool;
+
     @Before
     public void before() {
         taskEventService.start();
@@ -101,8 +104,6 @@ public class TaskResponseServiceTest {
 
     @Test
     public void testAddResponse() {
-        Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
-        Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null);
         taskEventService.addEvent(ackEvent);
         taskEventService.addEvent(resultEvent);
     }