You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/09/06 02:12:48 UTC

[dolphinscheduler] branch dev updated: [Bug-11650][worker] #11650 fix SQL type task, stop task cause NPE (#11668)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 45ebd528d6 [Bug-11650][worker] #11650 fix SQL type task, stop task cause NPE (#11668)
45ebd528d6 is described below

commit 45ebd528d6f70df8f23d930a075cc0d7a451946d
Author: 冯剑 <35...@users.noreply.github.com>
AuthorDate: Tue Sep 6 10:12:40 2022 +0800

    [Bug-11650][worker] #11650 fix SQL type task, stop task cause NPE (#11668)
---
 .../server/worker/processor/TaskKillProcessor.java | 39 ++++++++++++++--------
 1 file changed, 26 insertions(+), 13 deletions(-)

diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 3aadb5a5cc..d73cc5b3db 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.server.worker.processor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+
+import io.micrometer.core.lang.NonNull;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@@ -43,6 +43,11 @@ import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
 import org.apache.dolphinscheduler.service.log.LogClientService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -78,7 +83,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(),
-                String.format("invalid command type : %s", command.getType()));
+            String.format("invalid command type : %s", command.getType()));
         TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
         if (killCommand == null) {
             logger.error("task kill request command is null");
@@ -88,7 +93,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
         int taskInstanceId = killCommand.getTaskInstanceId();
         TaskExecutionContext taskExecutionContext =
-                TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+            TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
         if (taskExecutionContext == null) {
             logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
             return;
@@ -110,7 +115,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
 
         taskExecutionContext.setCurrentExecutionStatus(
-                result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
+            result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
         taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
         sendTaskKillResponseCommand(channel, taskExecutionContext);
 
@@ -123,7 +128,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
         TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
         taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus());
-        taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+        if (taskExecutionContext.getAppIds() != null) {
+            taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+        }
         taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
         taskKillResponseCommand.setHost(taskExecutionContext.getHost());
         taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
@@ -138,14 +145,20 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         });
     }
 
+    /**
+     * do kill
+     *
+     * @return kill result
+     */
     private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
         // kill system process
         boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
+
         // find log and kill yarn job
         Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
-                taskExecutionContext.getLogPath(),
-                taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTenantCode());
+            taskExecutionContext.getLogPath(),
+            taskExecutionContext.getExecutePath(),
+            taskExecutionContext.getTenantCode());
         return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight());
     }
 
@@ -200,8 +213,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     /**
      * kill yarn job
      *
-     * @param host        host
-     * @param logPath     logPath
+     * @param host host
+     * @param logPath logPath
      * @param executePath executePath
      * @param tenantCode  tenantCode
      * @return Pair<Boolean, List < String>> yarn kill result
@@ -212,7 +225,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
                                                     String tenantCode) {
         if (logPath == null || executePath == null || tenantCode == null) {
             logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}",
-                    host, logPath, executePath, tenantCode);
+                host, logPath, executePath, tenantCode);
             return Pair.of(false, Collections.emptyList());
         }
         try (LogClientService logClient = new LogClientService()) {
@@ -233,4 +246,4 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         return Pair.of(false, Collections.emptyList());
     }
 
-}
+}
\ No newline at end of file