You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/02/23 10:22:54 UTC

[dolphinscheduler] branch dev updated: [Bug-8110][WorkerServer] kill all running task before worker stop (#8490)

This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f2b9796  [Bug-8110][WorkerServer] kill all running task before worker stop (#8490)
f2b9796 is described below

commit f2b9796bc20ef40e1737ea3afe9a00503b44ae3d
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Feb 23 18:22:42 2022 +0800

    [Bug-8110][WorkerServer] kill all running task before worker stop (#8490)
    
    * kill all running task before worker stop
    
    * kill local process
    
    * remove kill yarn job
    
    * adjust the orders of close
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../spi/task/TaskExecutionContextCacheManager.java |  5 ++++
 .../server/worker/WorkerServer.java                | 27 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
index c4347d6..e2ab195 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.spi.task;
 
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -66,4 +67,8 @@ public class TaskExecutionContextCacheManager {
         taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request);
         return taskRequestContextCache.containsKey(request.getTaskInstanceId());
     }
+
+    public static Collection<TaskRequest> getAllTaskRequestList() {
+        return taskRequestContextCache.values();
+    }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 6296aeb..e51f1d2 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -37,7 +37,12 @@ import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.Collection;
 import java.util.Set;
 
 import javax.annotation.PostConstruct;
@@ -201,6 +206,11 @@ public class WorkerServer implements IStoppable {
             this.nettyRemotingServer.close();
             this.workerRegistryClient.unRegistry();
             this.alertClientService.close();
+
+            // kill running tasks
+            this.killAllRunningTasks();
+
+            // close the application context
             this.springApplicationContext.close();
         } catch (Exception e) {
             logger.error("worker server stop exception ", e);
@@ -211,4 +221,21 @@ public class WorkerServer implements IStoppable {
     public void stop(String cause) {
         close(cause);
     }
+
+    /**
+     * kill all tasks which are running
+     */
+    public void killAllRunningTasks() {
+        Collection<TaskRequest> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
+        logger.info("ready to kill all cache job, job size:{}", taskRequests.size());
+
+        if (CollectionUtils.isEmpty(taskRequests)) {
+            return;
+        }
+
+        for (TaskRequest taskRequest : taskRequests) {
+            // kill task when it's not finished yet
+            org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
+        }
+    }
 }