You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/03/31 06:13:38 UTC
[dolphinscheduler] branch dev updated: [Improvement-9288][Master] add task event thread pool (#9293)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 52ba2c6 [Improvement-9288][Master] add task event thread pool (#9293)
52ba2c6 is described below
commit 52ba2c6475042ad2bd65d2987b7a2e2d23ddd8fa
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Thu Mar 31 14:13:33 2022 +0800
[Improvement-9288][Master] add task event thread pool (#9293)
* add task event thread
* license heander
* ci test
* delete unuse file
* fix CI test
Co-authored-by: caishunfeng <53...@qq.com>
---
.../master/processor/queue/TaskEventService.java | 193 ++++-----------------
...askEventService.java => TaskExecuteThread.java} | 131 +++++---------
.../processor/queue/TaskExecuteThreadPool.java | 138 +++++++++++++++
.../processor/queue/TaskResponseServiceTest.java | 5 +-
4 files changed, 220 insertions(+), 247 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index 4984390..924dd13 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -17,24 +17,14 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
-import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -44,8 +34,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import io.netty.channel.Channel;
-
/**
* task manager
*/
@@ -63,45 +51,38 @@ public class TaskEventService {
private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<>();
/**
- * process service
- */
- @Autowired
- private ProcessService processService;
-
- /**
- * data quality result operator
- */
- @Autowired
- private DataQualityResultOperator dataQualityResultOperator;
-
- /**
* task event worker
*/
- private Thread taskEventWorker;
+ private Thread taskEventThread;
- @Autowired
- private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+ private Thread taskEventHandlerThread;
@Autowired
- private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+ private TaskExecuteThreadPool taskExecuteThreadPool;
@PostConstruct
public void start() {
- this.taskEventWorker = new TaskEventWorker();
- this.taskEventWorker.setName("TaskStateEventWorker");
- this.taskEventWorker.start();
+ this.taskEventThread = new TaskEventThread();
+ this.taskEventThread.setName("TaskEventThread");
+ this.taskEventThread.start();
+
+ this.taskEventHandlerThread = new TaskEventHandlerThread();
+ this.taskEventHandlerThread.setName("TaskEventHandlerThread");
+ this.taskEventHandlerThread.start();
}
@PreDestroy
public void stop() {
try {
- this.taskEventWorker.interrupt();
+ this.taskEventThread.interrupt();
+ this.taskEventHandlerThread.interrupt();
if (!eventQueue.isEmpty()) {
List<TaskEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents);
- for (TaskEvent event : remainEvents) {
- this.persist(event);
+ for (TaskEvent taskEvent : remainEvents) {
+ taskExecuteThreadPool.submitTaskEvent(taskEvent);
}
+ taskExecuteThreadPool.eventHandler();
}
} catch (Exception e) {
logger.error("stop error:", e);
@@ -109,32 +90,25 @@ public class TaskEventService {
}
/**
- * add event to queue
+ * add event
*
* @param taskEvent taskEvent
*/
public void addEvent(TaskEvent taskEvent) {
- try {
- eventQueue.put(taskEvent);
- } catch (InterruptedException e) {
- logger.error("add task event : {} error :{}", taskEvent, e);
- Thread.currentThread().interrupt();
- }
+ taskExecuteThreadPool.submitTaskEvent(taskEvent);
}
/**
* task worker thread
*/
- class TaskEventWorker extends Thread {
-
+ class TaskEventThread extends Thread {
@Override
public void run() {
-
while (Stopper.isRunning()) {
try {
// if not task , blocking here
TaskEvent taskEvent = eventQueue.take();
- persist(taskEvent);
+ taskExecuteThreadPool.submitTaskEvent(taskEvent);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
@@ -147,121 +121,24 @@ public class TaskEventService {
}
/**
- * persist task event
- *
- * @param taskEvent taskEvent
- */
- private void persist(TaskEvent taskEvent) {
- Event event = taskEvent.getEvent();
- int taskInstanceId = taskEvent.getTaskInstanceId();
- int processInstanceId = taskEvent.getProcessInstanceId();
-
- TaskInstance taskInstance;
- WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
- if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
- taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
- } else {
- taskInstance = processService.findTaskInstanceById(taskInstanceId);
- }
-
- switch (event) {
- case DISPATCH:
- handleDispatchEvent(taskEvent, taskInstance);
- // dispatch event do not need to submit state event
- return;
- case DELAY:
- case RUNNING:
- handleRunningEvent(taskEvent, taskInstance);
- break;
- case RESULT:
- handleResultEvent(taskEvent, taskInstance);
- break;
- default:
- throw new IllegalArgumentException("invalid event type : " + event);
- }
-
- StateEvent stateEvent = new StateEvent();
- stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
- stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
- stateEvent.setExecutionStatus(taskEvent.getState());
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- workflowExecuteThreadPool.submitStateEvent(stateEvent);
- }
-
- /**
- * handle dispatch event
+ * event handler thread
*/
- private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
- if (taskInstance == null) {
- logger.error("taskInstance is null");
- return;
- }
- if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
- return;
- }
- taskInstance.setState(ExecutionStatus.DISPATCH);
- taskInstance.setHost(taskEvent.getWorkerAddress());
- processService.saveTaskInstance(taskInstance);
- }
+ class TaskEventHandlerThread extends Thread {
- /**
- * handle running event
- */
- private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
- Channel channel = taskEvent.getChannel();
- try {
- if (taskInstance != null) {
- if (taskInstance.getState().typeIsFinished()) {
- logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
- } else {
- taskInstance.setState(taskEvent.getState());
- taskInstance.setStartTime(taskEvent.getStartTime());
- taskInstance.setHost(taskEvent.getWorkerAddress());
- taskInstance.setLogPath(taskEvent.getLogPath());
- taskInstance.setExecutePath(taskEvent.getExecutePath());
- taskInstance.setPid(taskEvent.getProcessId());
- taskInstance.setAppLink(taskEvent.getAppIds());
- processService.saveTaskInstance(taskInstance);
+ @Override
+ public void run() {
+ logger.info("event handler thread started");
+ while (Stopper.isRunning()) {
+ try {
+ taskExecuteThreadPool.eventHandler();
+ TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Exception e) {
+ logger.error("event handler thread error", e);
}
}
- // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
- channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
- } catch (Exception e) {
- logger.error("worker ack master error", e);
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
- channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
- }
- }
-
- /**
- * handle result event
- */
- private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
- Channel channel = taskEvent.getChannel();
- try {
- if (taskInstance != null) {
- dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
-
- taskInstance.setStartTime(taskEvent.getStartTime());
- taskInstance.setHost(taskEvent.getWorkerAddress());
- taskInstance.setLogPath(taskEvent.getLogPath());
- taskInstance.setExecutePath(taskEvent.getExecutePath());
- taskInstance.setPid(taskEvent.getProcessId());
- taskInstance.setAppLink(taskEvent.getAppIds());
- taskInstance.setState(taskEvent.getState());
- taskInstance.setEndTime(taskEvent.getEndTime());
- taskInstance.setVarPool(taskEvent.getVarPool());
- processService.changeOutParam(taskInstance);
- processService.saveTaskInstance(taskInstance);
- }
- // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
- channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
- } catch (Exception e) {
- logger.error("worker response master error", e);
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
- channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
similarity index 75%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
index 4984390..47b190e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
@@ -31,119 +30,77 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPoo
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
/**
- * task manager
+ * task execute thread
*/
-@Component
-public class TaskEventService {
+public class TaskExecuteThread {
- /**
- * logger
- */
- private final Logger logger = LoggerFactory.getLogger(TaskEventService.class);
+ private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
- /**
- * attemptQueue
- */
- private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<>();
+ private final int processInstanceId;
- /**
- * process service
- */
- @Autowired
- private ProcessService processService;
+ private final ConcurrentLinkedQueue<TaskEvent> events = new ConcurrentLinkedQueue<>();
- /**
- * data quality result operator
- */
- @Autowired
- private DataQualityResultOperator dataQualityResultOperator;
+ private ProcessService processService;
- /**
- * task event worker
- */
- private Thread taskEventWorker;
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
- @Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
- @Autowired
- private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+ private DataQualityResultOperator dataQualityResultOperator;
- @PostConstruct
- public void start() {
- this.taskEventWorker = new TaskEventWorker();
- this.taskEventWorker.setName("TaskStateEventWorker");
- this.taskEventWorker.start();
+ public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
+ ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
+ this.processInstanceId = processInstanceId;
+ this.processService = processService;
+ this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+ this.processInstanceExecCacheManager = processInstanceExecCacheManager;
+ this.dataQualityResultOperator = dataQualityResultOperator;
}
- @PreDestroy
- public void stop() {
- try {
- this.taskEventWorker.interrupt();
- if (!eventQueue.isEmpty()) {
- List<TaskEvent> remainEvents = new ArrayList<>(eventQueue.size());
- eventQueue.drainTo(remainEvents);
- for (TaskEvent event : remainEvents) {
- this.persist(event);
- }
+ public void run() {
+ while (!this.events.isEmpty()) {
+ TaskEvent event = this.events.peek();
+ try {
+ persist(event);
+ } catch (Exception e) {
+ logger.error("persist error, event:{}, error: {}", event, e);
+ } finally {
+ this.events.remove(event);
}
- } catch (Exception e) {
- logger.error("stop error:", e);
}
}
- /**
- * add event to queue
- *
- * @param taskEvent taskEvent
- */
- public void addEvent(TaskEvent taskEvent) {
- try {
- eventQueue.put(taskEvent);
- } catch (InterruptedException e) {
- logger.error("add task event : {} error :{}", taskEvent, e);
- Thread.currentThread().interrupt();
- }
+ public String getKey() {
+ return String.valueOf(processInstanceId);
}
- /**
- * task worker thread
- */
- class TaskEventWorker extends Thread {
+ public int eventSize() {
+ return this.events.size();
+ }
+
+ public boolean isEmpty() {
+ return this.events.isEmpty();
+ }
- @Override
- public void run() {
+ public Integer getProcessInstanceId() {
+ return processInstanceId;
+ }
- while (Stopper.isRunning()) {
- try {
- // if not task , blocking here
- TaskEvent taskEvent = eventQueue.take();
- persist(taskEvent);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- } catch (Exception e) {
- logger.error("persist task error", e);
- }
- }
- logger.info("StateEventResponseWorker stopped");
+ public boolean addEvent(TaskEvent event) {
+ if (event.getProcessInstanceId() != this.processInstanceId) {
+ logger.warn("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}",
+ event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId);
+ return false;
}
+ return this.events.add(event);
}
/**
@@ -264,4 +221,4 @@ public class TaskEventService {
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
new file mode 100644
index 0000000..ccdab1b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+@Component
+public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
+
+ private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class);
+
+ private final ConcurrentHashMap<String, TaskExecuteThread> multiThreadFilterMap = new ConcurrentHashMap<>();
+
+ @Autowired
+ private MasterConfig masterConfig;
+
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+ /**
+ * process service
+ */
+ @Autowired
+ private ProcessService processService;
+
+ /**
+ * data quality result operator
+ */
+ @Autowired
+ private DataQualityResultOperator dataQualityResultOperator;
+
+
+ @Autowired
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+ /**
+ * task event thread map
+ */
+ private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+
+ @PostConstruct
+ private void init() {
+ this.setDaemon(true);
+ this.setThreadNamePrefix("Task-Execute-Thread-");
+ this.setMaxPoolSize(masterConfig.getExecThreads());
+ this.setCorePoolSize(masterConfig.getExecThreads());
+ }
+
+ public void submitTaskEvent(TaskEvent taskEvent) {
+ if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
+ logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
+ return;
+ }
+ if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
+ TaskExecuteThread taskExecuteThread = new TaskExecuteThread(
+ taskEvent.getProcessInstanceId(),
+ processService, workflowExecuteThreadPool,
+ processInstanceExecCacheManager,
+ dataQualityResultOperator);
+ taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
+ }
+ TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
+ if (taskExecuteThread != null) {
+ taskExecuteThread.addEvent(taskEvent);
+ }
+ }
+
+ public void eventHandler() {
+ for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) {
+ executeEvent(taskExecuteThread);
+ }
+ }
+
+ public void executeEvent(TaskExecuteThread taskExecuteThread) {
+ if (taskExecuteThread.eventSize() == 0) {
+ return;
+ }
+ if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
+ return;
+ }
+ ListenableFuture future = this.submitListenable(() -> {
+ taskExecuteThread.run();
+ multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
+ });
+ future.addCallback(new ListenableFutureCallback() {
+ @Override
+ public void onFailure(Throwable ex) {
+ logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex);
+ if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
+ taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
+ logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+ }
+ multiThreadFilterMap.remove(taskExecuteThread.getKey());
+ }
+
+ @Override
+ public void onSuccess(Object result) {
+ logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId());
+ if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
+ taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
+ logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+ }
+ multiThreadFilterMap.remove(taskExecuteThread.getKey());
+ }
+ });
+ }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 9a054b6..c5b00db 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -67,6 +67,9 @@ public class TaskResponseServiceTest {
@Mock
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+ @Mock
+ private TaskExecuteThreadPool taskExecuteThreadPool;
+
@Before
public void before() {
taskEventService.start();
@@ -101,8 +104,6 @@ public class TaskResponseServiceTest {
@Test
public void testAddResponse() {
- Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
- Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null);
taskEventService.addEvent(ackEvent);
taskEventService.addEvent(resultEvent);
}