You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/01/10 10:47:31 UTC
[dolphinscheduler] branch 2.0.3-prepare updated: [dolphinscheduler-server] process instance is always running (#7914)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
new 8dc529d [dolphinscheduler-server] process instance is always running (#7914)
8dc529d is described below
commit 8dc529d35f19514123902daf1ad7b0e352988d87
Author: zwZjut <zw...@163.com>
AuthorDate: Mon Jan 10 18:47:26 2022 +0800
[dolphinscheduler-server] process instance is always running (#7914)
* #7698
* #7698
* #7698
* fix workflow is always running
* fix workflow is always running
* fix workflow is always running
* #7700 #7698
* add license
* fix ut
* #7700 #7698
Co-authored-by: honghuo.zw <ho...@alibaba-inc.com>
---
.../dolphinscheduler/common/enums/Event.java | 3 +-
.../remote/command/TaskKillResponseCommand.java | 14 +++
.../server/master/MasterServer.java | 4 +-
.../processor/TaskKillResponseProcessor.java | 26 ++++++
.../master/processor/queue/TaskResponseEvent.java | 15 ++-
.../processor/queue/TaskResponsePersistThread.java | 12 +++
.../processor/queue/TaskResponseService.java | 12 ++-
.../server/master/runner/MasterExecService.java | 102 +++++++++++++++++++++
.../master/runner/MasterSchedulerService.java | 23 ++++-
.../master/runner/StateWheelExecuteThread.java | 36 ++++++--
.../master/runner/WorkflowExecuteThread.java | 27 ++++--
.../master/runner/task/BaseTaskProcessor.java | 17 ++++
.../master/runner/task/CommonTaskProcessor.java | 19 +++-
.../master/runner/task/ConditionTaskProcessor.java | 18 +++-
.../master/runner/task/DependentTaskProcessor.java | 20 +++-
.../server/master/runner/task/ITaskProcessor.java | 2 +
.../master/runner/task/SubTaskProcessor.java | 11 +++
.../master/runner/task/SwitchTaskProcessor.java | 19 +++-
.../server/worker/processor/TaskKillProcessor.java | 11 +++
.../server/worker/runner/TaskExecuteThread.java | 4 +
.../server/worker/runner/WorkerExecService.java | 85 +++++++++++++++++
.../server/worker/runner/WorkerManagerThread.java | 21 ++++-
.../server/master/WorkflowExecuteThreadTest.java | 2 +-
.../processor/TaskKillResponseProcessorTest.java | 13 +++
24 files changed, 483 insertions(+), 33 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
index 9cec276..5adfe63 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
@@ -19,5 +19,6 @@ package org.apache.dolphinscheduler.common.enums;
public enum Event {
ACK,
- RESULT;
+ RESULT,
+ ACTION_STOP;
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
index 03ad4dd..98fbe0f 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
@@ -49,6 +49,11 @@ public class TaskKillResponseCommand implements Serializable {
private int processId;
/**
+ * process instance id
+ */
+ private int processInstanceId;
+
+ /**
* other resource manager appId , for example : YARN etc
*/
private List<String> appIds;
@@ -85,6 +90,14 @@ public class TaskKillResponseCommand implements Serializable {
this.processId = processId;
}
+ public int getProcessInstanceId() {
+ return processInstanceId;
+ }
+
+ public void setProcessInstanceId(int processInstanceId) {
+ this.processInstanceId = processInstanceId;
+ }
+
public List<String> getAppIds() {
return appIds;
}
@@ -114,6 +127,7 @@ public class TaskKillResponseCommand implements Serializable {
+ ", status=" + status
+ ", processId=" + processId
+ ", appIds=" + appIds
+ + ", processInstanceId=" + processInstanceId
+ '}';
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index a5d7cf4..a6642a1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -138,11 +138,13 @@ public class MasterServer implements IStoppable {
ackProcessor.init(processInstanceExecMaps);
TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor();
taskResponseProcessor.init(processInstanceExecMaps);
+ TaskKillResponseProcessor taskKillResponseProcessor = new TaskKillResponseProcessor();
+ taskKillResponseProcessor.init(processInstanceExecMaps);
StateEventProcessor stateEventProcessor = new StateEventProcessor();
stateEventProcessor.init(processInstanceExecMaps);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.start();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 28f18fe..36dde29 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -17,11 +17,18 @@
package org.apache.dolphinscheduler.server.master.processor;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +45,19 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class);
/**
+ * process service
+ */
+ private final TaskResponseService taskResponseService;
+
+ public TaskKillResponseProcessor() {
+ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
+ }
+
+ public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
+ this.taskResponseService.init(processInstanceExecMaps);
+ }
+
+ /**
* task final result response
* need master process , state persistence
*
@@ -50,6 +70,12 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class);
logger.info("received task kill response command : {}", responseCommand);
+ // TaskResponseEvent
+ TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(ExecutionStatus.of(responseCommand.getStatus()),
+ responseCommand.getTaskInstanceId(),
+ responseCommand.getProcessInstanceId()
+ );
+ taskResponseService.addResponse(taskResponseEvent);
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 224a617..f2a080c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -17,13 +17,13 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
-import com.fasterxml.jackson.annotation.JsonFormat;
-
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import java.util.Date;
+import com.fasterxml.jackson.annotation.JsonFormat;
+
import io.netty.channel.Channel;
/**
@@ -94,6 +94,17 @@ public class TaskResponseEvent {
private Channel channel;
private int processInstanceId;
+
+ public static TaskResponseEvent newActionStop(ExecutionStatus state,
+ int taskInstanceId,
+ int processInstanceId) {
+ TaskResponseEvent event = new TaskResponseEvent();
+ event.setState(state);
+ event.setTaskInstanceId(taskInstanceId);
+ event.setEvent(Event.ACTION_STOP);
+ event.setProcessInstanceId(processInstanceId);
+ return event;
+ }
public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime,
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
index 621dd79..ca8ad0a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.concurrent.ConcurrentHashMap;
@@ -146,6 +148,16 @@ public class TaskResponsePersistThread implements Runnable {
channel.writeAndFlush(taskResponseCommand.convert2Command());
}
break;
+ case ACTION_STOP:
+ WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
+ if (workflowExecuteThread != null) {
+ ITaskProcessor taskProcessor = workflowExecuteThread.getActiveTaskProcessorMaps().get(taskResponseEvent.getTaskInstanceId());
+ if (taskProcessor != null) {
+ taskProcessor.persist(TaskAction.STOP);
+ logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId());
+ }
+ }
+ break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 5ef2350..b5e70ee 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -115,10 +115,20 @@ public class TaskResponseService {
try {
this.taskResponseWorker.interrupt();
this.taskResponseEventHandler.interrupt();
- this.eventExecService.shutdown();
} catch (Exception e) {
logger.error("stop error:", e);
}
+ this.eventExecService.shutdown();
+ long waitSec = 5;
+ boolean terminated = false;
+ try {
+ terminated = eventExecService.awaitTermination(waitSec, TimeUnit.SECONDS);
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+ if (!terminated) {
+ logger.warn("TaskResponseService: eventExecService shutdown without terminated: {}s, increase await time", waitSec);
+ }
}
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java
new file mode 100644
index 0000000..edb5e66
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class MasterExecService {
+
+ /**
+ * logger of MasterExecService
+ */
+ private static final Logger logger = LoggerFactory.getLogger(MasterExecService.class);
+
+ /**
+ * master exec service
+ */
+ private final ThreadPoolExecutor execService;
+
+ private final ListeningExecutorService listeningExecutorService;
+
+ /**
+ * start process failed map
+ */
+ private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap;
+
+ private final ConcurrentHashMap<Integer, WorkflowExecuteThread> filterMap = new ConcurrentHashMap<>();
+
+ public MasterExecService(ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap,ThreadPoolExecutor execService) {
+ this.startProcessFailedMap = startProcessFailedMap;
+ this.execService = execService;
+ this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
+ }
+
+ public void execute(WorkflowExecuteThread workflowExecuteThread) {
+ if (workflowExecuteThread == null
+ || workflowExecuteThread.getProcessInstance() == null
+ || workflowExecuteThread.isStart()
+ || filterMap.containsKey(workflowExecuteThread.getProcessInstance().getId())) {
+ return;
+ }
+ Integer processInstanceId = workflowExecuteThread.getProcessInstance().getId();
+ filterMap.put(processInstanceId, workflowExecuteThread);
+ ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread);
+ FutureCallback futureCallback = new FutureCallback() {
+ @Override
+ public void onSuccess(Object o) {
+ if (!workflowExecuteThread.isStart()) {
+ startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread);
+ } else {
+ startProcessFailedMap.remove(processInstanceId);
+ }
+ filterMap.remove(processInstanceId);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("handle events {} failed", processInstanceId, throwable);
+ if (!workflowExecuteThread.isStart()) {
+ startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread);
+ } else {
+ startProcessFailedMap.remove(processInstanceId);
+ }
+ filterMap.remove(processInstanceId);
+ }
+ };
+ Futures.addCallback(future, futureCallback, this.listeningExecutorService);
+ }
+
+ public void shutdown() {
+ this.execService.shutdown();
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return this.execService.awaitTermination(timeout, unit);
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 33c84b3..79cce4f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
@@ -58,6 +59,12 @@ public class MasterSchedulerService extends Thread {
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
/**
+ * handle task event
+ */
+ @Autowired
+ private TaskResponseService taskResponseService;
+
+ /**
* dolphinscheduler database interface
*/
@Autowired
@@ -92,7 +99,12 @@ public class MasterSchedulerService extends Thread {
/**
* master exec service
*/
- private ThreadPoolExecutor masterExecService;
+ private MasterExecService masterExecService;
+
+ /**
+ * start process failed map
+ */
+ private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap = new ConcurrentHashMap<>();
/**
* process instance execution list
@@ -126,11 +138,15 @@ public class MasterSchedulerService extends Thread {
*/
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.processInstanceExecMaps = processInstanceExecMaps;
- this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
+ this.masterExecService = new MasterExecService(this.startProcessFailedMap,
+ (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()));
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
- stateWheelExecuteThread = new StateWheelExecuteThread(processService,
+ stateWheelExecuteThread = new StateWheelExecuteThread(
+ masterExecService,
+ processService,
+ startProcessFailedMap,
processTimeoutCheckList,
taskTimeoutCheckList,
taskRetryCheckList,
@@ -202,6 +218,7 @@ public class MasterSchedulerService extends Thread {
if (processInstance != null) {
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
processInstance
+ , taskResponseService
, processService
, nettyExecutorManager
, processAlertManager
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index de6db1d..e12be0c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -49,15 +49,30 @@ public class StateWheelExecuteThread extends Thread {
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
+ /**
+ * start process failed map
+ */
+ private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap;
+
private int stateCheckIntervalSecs;
- public StateWheelExecuteThread(ProcessService processService,
- ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
- ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
- ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
- ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
- int stateCheckIntervalSecs) {
+ /**
+ * master exec service
+ */
+ private MasterExecService masterExecService;
+
+ public StateWheelExecuteThread(
+ MasterExecService masterExecService,
+ ProcessService processService,
+ ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap,
+ ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
+ ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
+ ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
+ ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
+ int stateCheckIntervalSecs) {
+ this.masterExecService = masterExecService;
this.processService = processService;
+ this.startProcessFailedMap = startProcessFailedMap;
this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList;
this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList;
this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
@@ -71,6 +86,7 @@ public class StateWheelExecuteThread extends Thread {
logger.info("state wheel thread start");
while (Stopper.isRunning()) {
try {
+ check4StartProcessFailed();
checkTask4Timeout();
checkTask4Retry();
checkProcess4Timeout();
@@ -176,4 +192,12 @@ public class StateWheelExecuteThread extends Thread {
workflowExecuteThread.addStateEvent(stateEvent);
}
+ private void check4StartProcessFailed() {
+ if (startProcessFailedMap.isEmpty()) {
+ return;
+ }
+ for (WorkflowExecuteThread workflowExecuteThread : this.startProcessFailedMap.values()) {
+ masterExecService.execute(workflowExecuteThread);
+ }
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 92862ae..5988839 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -57,6 +57,8 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
@@ -104,6 +106,11 @@ public class WorkflowExecuteThread implements Runnable {
private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
/**
+ * handle task event
+ */
+ private TaskResponseService taskResponseService;
+
+ /**
* process instance
*/
private ProcessInstance processInstance;
@@ -205,6 +212,7 @@ public class WorkflowExecuteThread implements Runnable {
* @param nettyExecutorManager nettyExecutorManager
*/
public WorkflowExecuteThread(ProcessInstance processInstance
+ , TaskResponseService taskResponseService
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
@@ -212,7 +220,7 @@ public class WorkflowExecuteThread implements Runnable {
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
, ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList) {
this.processService = processService;
-
+ this.taskResponseService = taskResponseService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
@@ -1249,13 +1257,12 @@ public class WorkflowExecuteThread implements Runnable {
}
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId);
taskProcessor.action(TaskAction.STOP);
- if (taskProcessor.taskState().typeIsFinished()) {
- StateEvent stateEvent = new StateEvent();
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- stateEvent.setProcessInstanceId(this.processInstance.getId());
- stateEvent.setTaskInstanceId(taskInstance.getId());
- stateEvent.setExecutionStatus(taskProcessor.taskState());
- this.addStateEvent(stateEvent);
+ if (taskProcessor != null && taskProcessor.taskState().typeIsFinished()) {
+ TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(
+ taskProcessor.taskState(),
+ taskInstance.getId(),
+ this.processInstance.getId());
+ taskResponseService.addResponse(taskResponseEvent);
}
}
}
@@ -1420,4 +1427,8 @@ public class WorkflowExecuteThread implements Runnable {
TaskDependType depNodeType) throws Exception {
return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
}
+
+ public Map<Integer, ITaskProcessor> getActiveTaskProcessorMaps() {
+ return activeTaskProcessorMaps;
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 5532477..4446485 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -83,6 +83,13 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
/**
+ * persist task
+ *
+ * @return
+ */
+ protected abstract boolean persistTask(TaskAction taskAction);
+
+ /**
* pause task, common tasks donot need this.
*
* @return
@@ -102,6 +109,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected abstract boolean taskTimeout();
+ /**
+ * persist
+ *
+ * @return
+ */
+ @Override
+ public boolean persist(TaskAction taskAction) {
+ return persistTask(taskAction);
+ }
+
@Override
public void run() {
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 23988f9..14bb3af 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -87,6 +87,24 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean persistTask(TaskAction taskAction) {
+ switch (taskAction) {
+ case STOP:
+ if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
+ return true;
+ }
+ taskInstance.setState(ExecutionStatus.KILL);
+ taskInstance.setEndTime(new Date());
+ processService.updateTaskInstance(taskInstance);
+ return true;
+ default:
+ logger.error("unknown task action: {}", taskAction.toString());
+
+ }
+ return false;
+ }
+
/**
* common task cannot be paused
*/
@@ -154,7 +172,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
if (StringUtils.isBlank(taskInstance.getHost())) {
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
- processService.updateTaskInstance(taskInstance);
return true;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 7c593b0..584e484 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -134,10 +134,26 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
}
@Override
+ protected boolean persistTask(TaskAction taskAction) {
+ switch (taskAction) {
+ case STOP:
+ if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
+ return true;
+ }
+ this.taskInstance.setState(ExecutionStatus.KILL);
+ this.taskInstance.setEndTime(new Date());
+ processService.saveTaskInstance(taskInstance);
+ return true;
+ default:
+ logger.error("unknown task action: {}", taskAction.toString());
+ }
+ return false;
+ }
+
+ @Override
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
return true;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index b26e641..0f84a5f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
@@ -156,10 +155,27 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
}
@Override
+ protected boolean persistTask(TaskAction taskAction) {
+ switch (taskAction) {
+ case STOP:
+ if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
+ return true;
+ }
+ this.taskInstance.setState(ExecutionStatus.KILL);
+ this.taskInstance.setEndTime(new Date());
+ processService.saveTaskInstance(taskInstance);
+ return true;
+ default:
+ logger.error("unknown task action: {}", taskAction.toString());
+
+ }
+ return false;
+ }
+
+ @Override
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
return true;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
index b68dc22..aa1e490 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
@@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
*/
public interface ITaskProcessor {
+ boolean persist(TaskAction taskAction);
+
void run();
boolean action(TaskAction taskAction);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index e0cd3e8..02f08a8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -114,6 +114,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
@Override
+ protected boolean persistTask(TaskAction taskAction) {
+ switch (taskAction) {
+ case STOP:
+ return true;
+ default:
+ logger.error("unknown task action: {}", taskAction.toString());
+ }
+ return false;
+ }
+
+ @Override
protected boolean pauseTask() {
pauseSubWorkFlow();
return true;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 8e9316f..c48a711 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -98,6 +98,24 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
@Override
+ protected boolean persistTask(TaskAction taskAction) {
+ switch (taskAction) {
+ case STOP:
+ if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
+ return true;
+ }
+ this.taskInstance.setState(ExecutionStatus.KILL);
+ this.taskInstance.setEndTime(new Date());
+ processService.saveTaskInstance(taskInstance);
+ return true;
+ default:
+ logger.error("unknown task action: {}", taskAction.toString());
+
+ }
+ return false;
+ }
+
+ @Override
protected boolean pauseTask() {
this.taskInstance.setState(ExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
@@ -109,7 +127,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
- processService.saveTaskInstance(taskInstance);
return true;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 4341b8c..4f235ea 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -32,10 +32,12 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
@@ -118,6 +120,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
try {
Integer processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) {
+ TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
+ if (null != taskExecuteThread) {
+ AbstractTask task = taskExecuteThread.getTask();
+ if (task != null) {
+ task.cancelApplication(true);
+ logger.info("kill task by cancelApplication, task id:{}", taskInstanceId);
+ }
+ }
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
@@ -165,6 +175,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+ taskKillResponseCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
}
return taskKillResponseCommand;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 0b18dcf..4b9f99c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -389,4 +389,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
taskExecutionContext.setParamsMap(paramsMap);
}
+
+ public AbstractTask getTask() {
+ return task;
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
new file mode 100644
index 0000000..b980246
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class WorkerExecService {
+ /**
+ * logger of WorkerExecService
+ */
+ private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class);
+
+ private final ListeningExecutorService listeningExecutorService;
+
+ /**
+ * thread executor service
+ */
+ private final ExecutorService execService;
+
+ /**
+ * running task
+ */
+ private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
+
+ public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
+ this.execService = execService;
+ this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
+ this.taskExecuteThreadMap = taskExecuteThreadMap;
+ }
+
+ public void submit(TaskExecuteThread taskExecuteThread) {
+ taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
+ ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
+ FutureCallback futureCallback = new FutureCallback() {
+ @Override
+ public void onSuccess(Object o) {
+ taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
+ , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
+ taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
+ }
+ };
+ Futures.addCallback(future, futureCallback, this.listeningExecutorService);
+ }
+
+ /**
+ * get thread pool queue size
+ *
+ * @return queue size
+ */
+ public int getThreadPoolQueueSize() {
+ return ((ThreadPoolExecutor) this.execService).getQueue().size();
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 4f68166..0deab9e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -31,9 +31,8 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +52,11 @@ public class WorkerManagerThread implements Runnable {
private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
/**
+ * running task
+ */
+ private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+
+ /**
* worker config
*/
private final WorkerConfig workerConfig;
@@ -60,7 +64,7 @@ public class WorkerManagerThread implements Runnable {
/**
* thread executor service
*/
- private final ExecutorService workerExecService;
+ private final WorkerExecService workerExecService;
/**
* task callback service
@@ -69,10 +73,17 @@ public class WorkerManagerThread implements Runnable {
public WorkerManagerThread() {
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
- this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads());
+ this.workerExecService = new WorkerExecService(
+ ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()),
+ taskExecuteThreadMap
+ );
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
}
+ public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
+ return this.taskExecuteThreadMap.get(taskInstanceId);
+ }
+
/**
* get delay queue size
*
@@ -88,7 +99,7 @@ public class WorkerManagerThread implements Runnable {
* @return queue size
*/
public int getThreadPoolQueueSize() {
- return ((ThreadPoolExecutor) workerExecService).getQueue().size();
+ return this.workerExecService.getThreadPoolQueueSize();
}
/**
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index bf527d2..35520c5 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -102,7 +102,7 @@ public class WorkflowExecuteThreadTest {
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
- workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList));
+ workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, null,processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
index c7f0475..8bef045 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
@@ -20,19 +20,26 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import io.netty.channel.Channel;
/**
* task response processor test
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class})
public class TaskKillResponseProcessorTest {
private TaskKillResponseProcessor taskKillResponseProcessor;
@@ -41,8 +48,14 @@ public class TaskKillResponseProcessorTest {
private Channel channel;
+ private TaskResponseService taskResponseService;
+
@Before
public void before() {
+ PowerMockito.mockStatic(SpringApplicationContext.class);
+
+ taskResponseService = PowerMockito.mock(TaskResponseService.class);
+ PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService);
taskKillResponseProcessor = new TaskKillResponseProcessor();
channel = PowerMockito.mock(Channel.class);
taskKillResponseCommand = new TaskKillResponseCommand();