You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/04/20 14:46:26 UTC

[dolphinscheduler] branch dev updated: [Bug] cancel application when kill task (#9624)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 239be31ab7 [Bug] cancel application when kill task (#9624)
239be31ab7 is described below

commit 239be31ab732f5bbe68fca3033245125e8388657
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Apr 20 22:46:15 2022 +0800

    [Bug] cancel application when kill task (#9624)
    
    * cancel application when kill task
    
    * add warn log
    
    * add cancel application
---
 .../server/worker/processor/TaskKillProcessor.java | 86 ++++++++++++----------
 .../server/worker/runner/TaskExecuteThread.java    |  4 +
 .../server/worker/runner/WorkerExecService.java    | 85 +++++++++++++++++++++
 .../server/worker/runner/WorkerManagerThread.java  | 19 ++++-
 4 files changed, 154 insertions(+), 40 deletions(-)

diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 9873f1eaf6..ecc1ffe755 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
@@ -27,12 +28,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
-import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 
@@ -93,8 +94,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
             return;
         }
 
-        Integer processId = taskExecutionContext.getProcessId();
-        if (processId.equals(0)) {
+        int processId = taskExecutionContext.getProcessId();
+        if (processId == 0) {
+            this.cancelApplication(taskInstanceId);
             workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
             TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
             logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
@@ -121,22 +123,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      * @return kill result
      */
     private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
-        boolean processFlag = true;
-        List<String> appIds = Collections.emptyList();
-
-        try {
-            String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId());
-            if (!StringUtils.isEmpty(pidsStr)) {
-                String cmd = String.format("kill -9 %s", pidsStr);
-                cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd);
-                logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
-                OSUtils.exeCmd(cmd);
-            }
-
-        } catch (Exception e) {
-            processFlag = false;
-            logger.error("kill task error", e);
-        }
+        // kill system process
+        boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
         // find log and kill yarn job
         Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
                 taskExecutionContext.getLogPath(),
@@ -146,27 +134,51 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     }
 
     /**
-     * build TaskKillResponseCommand
-     *
-     * @param killCommand kill command
-     * @param result exe result
-     * @return build TaskKillResponseCommand
+     * kill task by cancel application
+     * @param taskInstanceId
      */
-    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
-                                                                 Pair<Boolean, List<String>> result) {
-        TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
-        taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
-        taskKillResponseCommand.setAppIds(result.getRight());
-        TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
-        if (taskExecutionContext == null) {
-            return taskKillResponseCommand;
+    protected void cancelApplication(int taskInstanceId) {
+        TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
+        if (taskExecuteThread == null) {
+            logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
+            return;
+        }
+        AbstractTask task = taskExecuteThread.getTask();
+        if (task == null) {
+            logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
+            return;
         }
-        if (taskExecutionContext != null) {
-            taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-            taskKillResponseCommand.setHost(taskExecutionContext.getHost());
-            taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+        try {
+            task.cancelApplication(true);
+        } catch (Exception e) {
+            logger.error("kill task error", e);
+        }
+        logger.info("kill task by cancelApplication, task id:{}", taskInstanceId);
+    }
+
+    /**
+     * kill system process
+     * @param tenantCode
+     * @param processId
+     */
+    protected boolean killProcess(String tenantCode, Integer processId) {
+        boolean processFlag = true;
+        if (processId == null || processId.equals(0)) {
+            return true;
+        }
+        try {
+            String pidsStr = ProcessUtils.getPidsStr(processId);
+            if (!StringUtils.isEmpty(pidsStr)) {
+                String cmd = String.format("kill -9 %s", pidsStr);
+                cmd = OSUtils.getSudoCmd(tenantCode, cmd);
+                logger.info("process id:{}, cmd:{}", processId, cmd);
+                OSUtils.exeCmd(cmd);
+            }
+        } catch (Exception e) {
+            processFlag = false;
+            logger.error("kill task error", e);
         }
-        return taskKillResponseCommand;
+        return processFlag;
     }
 
     /**
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 9fffec7662..b58675b899 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -330,6 +330,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
         return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
     }
 
+    public AbstractTask getTask() {
+        return task;
+    }
+
     private void preBuildBusinessParams() {
         Map<String, Property> paramsMap = new HashMap<>();
         // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
new file mode 100644
index 0000000000..ff050370cc
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class WorkerExecService {
+    /**
+     * logger of WorkerExecService
+     */
+    private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class);
+
+    private final ListeningExecutorService listeningExecutorService;
+
+    /**
+     * thread executor service
+     */
+    private final ExecutorService execService;
+
+    /**
+     * running task
+     */
+    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
+
+    public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
+        this.execService = execService;
+        this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
+        this.taskExecuteThreadMap = taskExecuteThreadMap;
+    }
+
+    public void submit(TaskExecuteThread taskExecuteThread) {
+        taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
+        ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
+        FutureCallback futureCallback = new FutureCallback() {
+            @Override
+            public void onSuccess(Object o) {
+                taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
+                    , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
+                taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
+            }
+        };
+        Futures.addCallback(future, futureCallback, this.listeningExecutorService);
+    }
+
+    /**
+     * get thread pool queue size
+     *
+     * @return queue size
+     */
+    public int getThreadPoolQueueSize() {
+        return ((ThreadPoolExecutor) this.execService).getQueue().size();
+    }
+
+} 
\ No newline at end of file
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 49ac9bc6d1..60f752401d 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -54,7 +55,7 @@ public class WorkerManagerThread implements Runnable {
     /**
      * thread executor service
      */
-    private final ExecutorService workerExecService;
+    private final WorkerExecService workerExecService;
 
     /**
      * task callback service
@@ -62,8 +63,20 @@ public class WorkerManagerThread implements Runnable {
     @Autowired
     private TaskCallbackService taskCallbackService;
 
+    /**
+     * running task
+     */
+    private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
+
     public WorkerManagerThread(WorkerConfig workerConfig) {
-        workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads());
+        workerExecService = new WorkerExecService(
+            ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
+            taskExecuteThreadMap
+        );
+    }
+
+    public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
+        return this.taskExecuteThreadMap.get(taskInstanceId);
     }
 
     /**
@@ -81,7 +94,7 @@ public class WorkerManagerThread implements Runnable {
      * @return queue size
      */
     public int getThreadPoolQueueSize() {
-        return ((ThreadPoolExecutor) workerExecService).getQueue().size();
+        return this.workerExecService.getThreadPoolQueueSize();
     }
 
     /**