You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/04/20 14:46:26 UTC
[dolphinscheduler] branch dev updated: [Bug] cancel application when kill task (#9624)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 239be31ab7 [Bug] cancel application when kill task (#9624)
239be31ab7 is described below
commit 239be31ab732f5bbe68fca3033245125e8388657
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Apr 20 22:46:15 2022 +0800
[Bug] cancel application when kill task (#9624)
* cancel application when kill task
* add warn log
* add cancel application
---
.../server/worker/processor/TaskKillProcessor.java | 86 ++++++++++++----------
.../server/worker/runner/TaskExecuteThread.java | 4 +
.../server/worker/runner/WorkerExecService.java | 85 +++++++++++++++++++++
.../server/worker/runner/WorkerManagerThread.java | 19 ++++-
4 files changed, 154 insertions(+), 40 deletions(-)
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 9873f1eaf6..ecc1ffe755 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
@@ -27,12 +28,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
-import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.log.LogClientService;
@@ -93,8 +94,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return;
}
- Integer processId = taskExecutionContext.getProcessId();
- if (processId.equals(0)) {
+ int processId = taskExecutionContext.getProcessId();
+ if (processId == 0) {
+ this.cancelApplication(taskInstanceId);
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
@@ -121,22 +123,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
- boolean processFlag = true;
- List<String> appIds = Collections.emptyList();
-
- try {
- String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId());
- if (!StringUtils.isEmpty(pidsStr)) {
- String cmd = String.format("kill -9 %s", pidsStr);
- cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd);
- logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
- OSUtils.exeCmd(cmd);
- }
-
- } catch (Exception e) {
- processFlag = false;
- logger.error("kill task error", e);
- }
+ // kill system process
+ boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
// find log and kill yarn job
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
taskExecutionContext.getLogPath(),
@@ -146,27 +134,51 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}
/**
- * build TaskKillResponseCommand
- *
- * @param killCommand kill command
- * @param result exe result
- * @return build TaskKillResponseCommand
+ * kill task by cancel application
+ * @param taskInstanceId
*/
- private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
- Pair<Boolean, List<String>> result) {
- TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
- taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
- taskKillResponseCommand.setAppIds(result.getRight());
- TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
- if (taskExecutionContext == null) {
- return taskKillResponseCommand;
+ protected void cancelApplication(int taskInstanceId) {
+ TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
+ if (taskExecuteThread == null) {
+ logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
+ return;
+ }
+ AbstractTask task = taskExecuteThread.getTask();
+ if (task == null) {
+ logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
+ return;
}
- if (taskExecutionContext != null) {
- taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskKillResponseCommand.setHost(taskExecutionContext.getHost());
- taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+ try {
+ task.cancelApplication(true);
+ } catch (Exception e) {
+ logger.error("kill task error", e);
+ }
+ logger.info("kill task by cancelApplication, task id:{}", taskInstanceId);
+ }
+
+ /**
+ * kill system process
+ * @param tenantCode
+ * @param processId
+ */
+ protected boolean killProcess(String tenantCode, Integer processId) {
+ boolean processFlag = true;
+ if (processId == null || processId.equals(0)) {
+ return true;
+ }
+ try {
+ String pidsStr = ProcessUtils.getPidsStr(processId);
+ if (!StringUtils.isEmpty(pidsStr)) {
+ String cmd = String.format("kill -9 %s", pidsStr);
+ cmd = OSUtils.getSudoCmd(tenantCode, cmd);
+ logger.info("process id:{}, cmd:{}", processId, cmd);
+ OSUtils.exeCmd(cmd);
+ }
+ } catch (Exception e) {
+ processFlag = false;
+ logger.error("kill task error", e);
}
- return taskKillResponseCommand;
+ return processFlag;
}
/**
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 9fffec7662..b58675b899 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -330,6 +330,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
+ public AbstractTask getTask() {
+ return task;
+ }
+
private void preBuildBusinessParams() {
Map<String, Property> paramsMap = new HashMap<>();
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
new file mode 100644
index 0000000000..ff050370cc
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class WorkerExecService {
+ /**
+ * logger of WorkerExecService
+ */
+ private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class);
+
+ private final ListeningExecutorService listeningExecutorService;
+
+ /**
+ * thread executor service
+ */
+ private final ExecutorService execService;
+
+ /**
+ * running task
+ */
+ private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
+
+ public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
+ this.execService = execService;
+ this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
+ this.taskExecuteThreadMap = taskExecuteThreadMap;
+ }
+
+ public void submit(TaskExecuteThread taskExecuteThread) {
+ taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
+ ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
+ FutureCallback futureCallback = new FutureCallback() {
+ @Override
+ public void onSuccess(Object o) {
+ taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
+ , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
+ taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
+ }
+ };
+ Futures.addCallback(future, futureCallback, this.listeningExecutorService);
+ }
+
+ /**
+ * get thread pool queue size
+ *
+ * @return queue size
+ */
+ public int getThreadPoolQueueSize() {
+ return ((ThreadPoolExecutor) this.execService).getQueue().size();
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 49ac9bc6d1..60f752401d 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -54,7 +55,7 @@ public class WorkerManagerThread implements Runnable {
/**
* thread executor service
*/
- private final ExecutorService workerExecService;
+ private final WorkerExecService workerExecService;
/**
* task callback service
@@ -62,8 +63,20 @@ public class WorkerManagerThread implements Runnable {
@Autowired
private TaskCallbackService taskCallbackService;
+ /**
+ * running task
+ */
+ private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+
public WorkerManagerThread(WorkerConfig workerConfig) {
- workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads());
+ workerExecService = new WorkerExecService(
+ ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
+ taskExecuteThreadMap
+ );
+ }
+
+ public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
+ return this.taskExecuteThreadMap.get(taskInstanceId);
}
/**
@@ -81,7 +94,7 @@ public class WorkerManagerThread implements Runnable {
* @return queue size
*/
public int getThreadPoolQueueSize() {
- return ((ThreadPoolExecutor) workerExecService).getQueue().size();
+ return this.workerExecService.getThreadPoolQueueSize();
}
/**