You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/02/23 10:22:54 UTC
[dolphinscheduler] branch dev updated: [Bug-8110][WorkerServer] kill all running task before worker stop (#8490)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f2b9796 [Bug-8110][WorkerServer] kill all running task before worker stop (#8490)
f2b9796 is described below
commit f2b9796bc20ef40e1737ea3afe9a00503b44ae3d
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Feb 23 18:22:42 2022 +0800
[Bug-8110][WorkerServer] kill all running task before worker stop (#8490)
* kill all running task before worker stop
* kill local process
* remove kill yarn job
* adjust the orders of close
Co-authored-by: caishunfeng <53...@qq.com>
---
.../spi/task/TaskExecutionContextCacheManager.java | 5 ++++
.../server/worker/WorkerServer.java | 27 ++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
index c4347d6..e2ab195 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.spi.task;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -66,4 +67,8 @@ public class TaskExecutionContextCacheManager {
taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request);
return taskRequestContextCache.containsKey(request.getTaskInstanceId());
}
+
+ public static Collection<TaskRequest> getAllTaskRequestList() {
+ return taskRequestContextCache.values();
+ }
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 6296aeb..e51f1d2 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -37,7 +37,12 @@ import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.Collection;
import java.util.Set;
import javax.annotation.PostConstruct;
@@ -201,6 +206,11 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.close();
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
+
+ // kill running tasks
+ this.killAllRunningTasks();
+
+ // close the application context
this.springApplicationContext.close();
} catch (Exception e) {
logger.error("worker server stop exception ", e);
@@ -211,4 +221,21 @@ public class WorkerServer implements IStoppable {
public void stop(String cause) {
close(cause);
}
+
+ /**
+ * kill all tasks which are running
+ */
+ public void killAllRunningTasks() {
+ Collection<TaskRequest> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
+ logger.info("ready to kill all cache job, job size:{}", taskRequests.size());
+
+ if (CollectionUtils.isEmpty(taskRequests)) {
+ return;
+ }
+
+ for (TaskRequest taskRequest : taskRequests) {
+ // kill task when it's not finished yet
+ org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
+ }
+ }
}