You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by lg...@apache.org on 2020/12/11 07:41:45 UTC

[incubator-dolphinscheduler] branch dev updated: [FIX-#4172][server-worker] kill task NPE (#4182)

This is an automated email from the ASF dual-hosted git repository.

lgcareer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a13e737  [FIX-#4172][server-worker] kill task NPE (#4182)
a13e737 is described below

commit a13e737eb472b21b6b4c596ede9afc80fa3beb06
Author: Kirs <ac...@163.com>
AuthorDate: Fri Dec 11 15:41:35 2020 +0800

    [FIX-#4172][server-worker] kill task NPE (#4182)
    
    * [FIX-#4172][server-worker] kill task NPE
    
    The cache task will be sent when the Process is generated. Before that, if a kill task appears, then NPE will appear
    Modification method: write into the cache when the task is received, and mark it as preData
    If the task is killed before the Process is generated, delete the cache directly at this time
    It will be judged before the process is generated. If the task has been killed, it will not be executed.
    After the new process is created, write it into the cache, and judge again, if kill, then kill the process.
    
    this closes #4172
    
    * Delete the commented out code
    Add spring beans
    
    * code smell
    
    * add test
    
    * add test
    
    * fix error
    
    * test
    
    * test
    
    * revert
    
    * fix error
---
 .../server/entity/TaskExecutionContext.java        |  33 +++---
 .../cache/TaskExecutionContextCacheManager.java    |  10 +-
 .../impl/TaskExecutionContextCacheManagerImpl.java |  16 ++-
 .../worker/processor/TaskExecuteProcessor.java     |  60 +++++++----
 .../server/worker/processor/TaskKillProcessor.java |  51 ++++-----
 .../worker/task/AbstractCommandExecutor.java       | 103 +++++++++++-------
 .../TaskExecutionContextCacheManagerTest.java      |  58 ++++++++++
 .../worker/processor/TaskCallbackServiceTest.java  | 113 +++++++-------------
 .../worker/processor/TaskKillProcessorTest.java    | 117 +++++++++++++++++++++
 9 files changed, 380 insertions(+), 181 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 7d28a77..a758028 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -38,7 +38,6 @@ public class TaskExecutionContext implements Serializable {
      */
     private int taskInstanceId;
 
-
     /**
      * task name
      */
@@ -506,22 +505,22 @@ public class TaskExecutionContext implements Serializable {
     @Override
     public String toString() {
         return "TaskExecutionContext{"
-                + "taskInstanceId=" + taskInstanceId
-                + ", taskName='" + taskName + '\''
-                + ", currentExecutionStatus=" + currentExecutionStatus
-                + ", firstSubmitTime=" + firstSubmitTime
-                + ", startTime=" + startTime
-                + ", taskType='" + taskType + '\''
-                + ", host='" + host + '\''
-                + ", executePath='" + executePath + '\''
-                + ", logPath='" + logPath + '\''
-                + ", taskJson='" + taskJson + '\''
-                + ", processId=" + processId
-                + ", appIds='" + appIds + '\''
-                + ", processInstanceId=" + processInstanceId
-                + ", scheduleTime=" + scheduleTime
-                + ", globalParams='" + globalParams + '\''
-                + ", executorId=" + executorId
+            + "taskInstanceId=" + taskInstanceId
+            + ", taskName='" + taskName + '\''
+            + ", currentExecutionStatus=" + currentExecutionStatus
+            + ", firstSubmitTime=" + firstSubmitTime
+            + ", startTime=" + startTime
+            + ", taskType='" + taskType + '\''
+            + ", host='" + host + '\''
+            + ", executePath='" + executePath + '\''
+            + ", logPath='" + logPath + '\''
+            + ", taskJson='" + taskJson + '\''
+            + ", processId=" + processId
+            + ", appIds='" + appIds + '\''
+            + ", processInstanceId=" + processInstanceId
+            + ", scheduleTime=" + scheduleTime
+            + ", globalParams='" + globalParams + '\''
+            + ", executorId=" + executorId
                 + ", cmdTypeIfComplement=" + cmdTypeIfComplement
                 + ", tenantCode='" + tenantCode + '\''
                 + ", queue='" + queue + '\''
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
index 7df8e01..71c795b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.cache;
 
-
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
 /**
@@ -42,7 +41,16 @@ public interface TaskExecutionContextCacheManager {
 
     /**
      * remove taskInstance by taskInstanceId
+     *
      * @param taskInstanceId taskInstanceId
      */
     void removeByTaskInstanceId(Integer taskInstanceId);
+
+    /**
+     * If the value for the specified key is present and non-null,then perform the update,otherwise it will return false
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @return status
+     */
+    boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext);
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
index 9c92fb2..5c3f990 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
@@ -19,13 +19,14 @@ package org.apache.dolphinscheduler.server.worker.cache.impl;
 
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
-import org.springframework.stereotype.Service;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.springframework.stereotype.Service;
+
 /**
- *  TaskExecutionContextCache
+ * TaskExecutionContextCache
  */
 @Service
 public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContextCacheManager {
@@ -34,7 +35,7 @@ public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContex
     /**
      * taskInstance cache
      */
-    private Map<Integer,TaskExecutionContext> taskExecutionContextCache = new ConcurrentHashMap<>();
+    private Map<Integer, TaskExecutionContext> taskExecutionContextCache = new ConcurrentHashMap<>();
 
     /**
      * get taskInstance by taskInstance id
@@ -54,15 +55,22 @@ public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContex
      */
     @Override
     public void cacheTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
-        taskExecutionContextCache.put(taskExecutionContext.getTaskInstanceId(),taskExecutionContext);
+        taskExecutionContextCache.put(taskExecutionContext.getTaskInstanceId(), taskExecutionContext);
     }
 
     /**
      * remove taskInstance by taskInstanceId
+     *
      * @param taskInstanceId taskInstanceId
      */
     @Override
     public void removeByTaskInstanceId(Integer taskInstanceId) {
         taskExecutionContextCache.remove(taskInstanceId);
     }
+
+    @Override
+    public boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
+        taskExecutionContextCache.computeIfPresent(taskExecutionContext.getTaskInstanceId(), (k, v) -> taskExecutionContext);
+        return taskExecutionContextCache.containsKey(taskExecutionContext.getTaskInstanceId());
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 6080baf..3fe3b6d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -21,13 +21,11 @@ import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
-import org.apache.dolphinscheduler.common.utils.RetryerUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -36,6 +34,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -47,44 +47,60 @@ import java.util.concurrent.ExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import io.netty.channel.Channel;
 
 /**
- *  worker request processor
+ * worker request processor
  */
 public class TaskExecuteProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
+    private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
 
     /**
-     *  thread executor service
+     * thread executor service
      */
     private final ExecutorService workerExecService;
 
     /**
-     *  worker config
+     * worker config
      */
     private final WorkerConfig workerConfig;
 
     /**
-     *  task callback service
+     * task callback service
      */
     private final TaskCallbackService taskCallbackService;
 
+    /**
+     * taskExecutionContextCacheManager
+     */
+    private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+
     public TaskExecuteProcessor() {
         this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
+        this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
+    }
+
+    /**
+     * Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache
+     *
+     * @param taskExecutionContext task
+     */
+    private void setTaskCache(TaskExecutionContext taskExecutionContext) {
+        TaskExecutionContext preTaskCache = new TaskExecutionContext();
+        preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
     }
 
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
-                String.format("invalid command type : %s", command.getType()));
+            String.format("invalid command type : %s", command.getType()));
 
         TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
-                command.getBody(), TaskExecuteRequestCommand.class);
+            command.getBody(), TaskExecuteRequestCommand.class);
 
         logger.info("received command : {}", taskRequestCommand);
 
@@ -100,11 +116,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
             logger.error("task execution context is null");
             return;
         }
+        setTaskCache(taskExecutionContext);
         // custom logger
         Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
-                taskExecutionContext.getProcessDefineId(),
-                taskExecutionContext.getProcessInstanceId(),
-                taskExecutionContext.getTaskInstanceId()));
+            taskExecutionContext.getProcessDefineId(),
+            taskExecutionContext.getProcessInstanceId(),
+            taskExecutionContext.getTaskInstanceId()));
 
         taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort());
         taskExecutionContext.setStartTime(new Date());
@@ -120,13 +137,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
             FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
         } catch (Throwable ex) {
             String errorLog = String.format("create execLocalPath : %s", execLocalPath);
-            LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
+            LoggerUtils.logError(Optional.of(logger), errorLog, ex);
             LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
+            taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
         }
         FileUtils.taskLoggerThreadLocal.remove();
 
         taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
-                new NettyRemoteChannel(channel, command.getOpaque()));
+            new NettyRemoteChannel(channel, command.getOpaque()));
 
         this.doAck(taskExecutionContext);
 
@@ -134,15 +152,16 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
     }
 
-    private void doAck(TaskExecutionContext taskExecutionContext){
+    private void doAck(TaskExecutionContext taskExecutionContext) {
         // tell master that task is in executing
         TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
-        ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK);
+        ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
         taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
     }
 
     /**
      * build ack command
+     *
      * @param taskExecutionContext taskExecutionContext
      * @return TaskExecuteAckCommand
      */
@@ -164,13 +183,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
     /**
      * get execute local path
+     *
      * @param taskExecutionContext taskExecutionContext
      * @return execute local path
      */
     private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
         return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
-                taskExecutionContext.getProcessDefineId(),
-                taskExecutionContext.getProcessInstanceId(),
-                taskExecutionContext.getTaskInstanceId());
+            taskExecutionContext.getProcessDefineId(),
+            taskExecutionContext.getProcessInstanceId(),
+            taskExecutionContext.getTaskInstanceId());
     }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 21108d1..45268e6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -48,19 +48,19 @@ import org.slf4j.LoggerFactory;
 import io.netty.channel.Channel;
 
 /**
- *  task kill processor
+ * task kill processor
  */
 public class TaskKillProcessor implements NettyRequestProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
 
     /**
-     *  worker config
+     * worker config
      */
     private final WorkerConfig workerConfig;
 
     /**
-     *  task callback service
+     * task callback service
      */
     private final TaskCallbackService taskCallbackService;
 
@@ -90,28 +90,29 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         Pair<Boolean, List<String>> result = doKill(killCommand);
 
         taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
-                new NettyRemoteChannel(channel, command.getOpaque()));
+            new NettyRemoteChannel(channel, command.getOpaque()));
 
-        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result);
+        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
         taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
         taskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
     }
 
     /**
-     *  do kill
+     * do kill
+     *
      * @param killCommand
      * @return kill result
      */
     private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand) {
-        List<String> appIds = Collections.EMPTY_LIST;
+        List<String> appIds = Collections.emptyList();
         try {
-            TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
-
+            int taskInstanceId = killCommand.getTaskInstanceId();
+            TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
             Integer processId = taskExecutionContext.getProcessId();
-
-            if (processId == null || processId.equals(0)) {
-                logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
-                return Pair.of(false, appIds);
+            if (processId.equals(0)) {
+                taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+                logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
+                return Pair.of(true, appIds);
             }
 
             String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
@@ -122,9 +123,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
             // find log and kill yarn job
             appIds = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),
-                    taskExecutionContext.getLogPath(),
-                    taskExecutionContext.getExecutePath(),
-                    taskExecutionContext.getTenantCode());
+                taskExecutionContext.getLogPath(),
+                taskExecutionContext.getExecutePath(),
+                taskExecutionContext.getTenantCode());
 
             return Pair.of(true, appIds);
         } catch (Exception e) {
@@ -136,8 +137,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     /**
      * build TaskKillResponseCommand
      *
-     * @param killCommand  kill command
-     * @param result exe result
+     * @param killCommand kill command
+     * @param result      exe result
      * @return build TaskKillResponseCommand
      */
     private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
@@ -155,20 +156,20 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     }
 
     /**
-     *  kill yarn job
+     * kill yarn job
      *
-     * @param host host
-     * @param logPath logPath
+     * @param host        host
+     * @param logPath     logPath
      * @param executePath executePath
-     * @param tenantCode tenantCode
+     * @param tenantCode  tenantCode
      * @return List<String> appIds
      */
     private List<String> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
         LogClientService logClient = null;
         try {
             logClient = new LogClientService();
-            logger.info("view log host : {},logPath : {}", host,logPath);
-            String log  = logClient.viewLog(host, Constants.RPC_PORT, logPath);
+            logger.info("view log host : {},logPath : {}", host, logPath);
+            String log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
 
             if (StringUtils.isNotEmpty(log)) {
                 List<String> appIds = LoggerUtils.getAppIds(log, logger);
@@ -182,7 +183,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
                 }
             }
         } catch (Exception e) {
-            logger.error("kill yarn job error",e);
+            logger.error("kill yarn job error", e);
         } finally {
             if (logClient != null) {
                 logClient.close();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index 89af952..da5c0e6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -14,36 +14,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task;
 
+import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL;
+import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.slf4j.Logger;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS;
+import org.slf4j.Logger;
 
 /**
  * abstract command executor
@@ -56,22 +64,22 @@ public abstract class AbstractCommandExecutor {
 
     protected StringBuilder varPool = new StringBuilder();
     /**
-     *  process
+     * process
      */
     private Process process;
 
     /**
-     *  log handler
+     * log handler
      */
     protected Consumer<List<String>> logHandler;
 
     /**
-     *  logger
+     * logger
      */
     protected Logger logger;
 
     /**
-     *  log list
+     * log list
      */
     protected final List<String> logBuffer;
 
@@ -86,8 +94,8 @@ public abstract class AbstractCommandExecutor {
     private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
 
     public AbstractCommandExecutor(Consumer<List<String>> logHandler,
-                                   TaskExecutionContext taskExecutionContext ,
-                                   Logger logger){
+                                   TaskExecutionContext taskExecutionContext,
+                                   Logger logger) {
         this.logHandler = logHandler;
         this.taskExecutionContext = taskExecutionContext;
         this.logger = logger;
@@ -135,12 +143,18 @@ public abstract class AbstractCommandExecutor {
      * @return CommandExecuteResult
      * @throws Exception if error throws Exception
      */
-    public CommandExecuteResult run(String execCommand) throws Exception{
+    public CommandExecuteResult run(String execCommand) throws Exception {
 
         CommandExecuteResult result = new CommandExecuteResult();
 
-
+        int taskInstanceId = taskExecutionContext.getTaskInstanceId();
+        // If the task has been killed, then the task in the cache is null
+        if (null == taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+            result.setExitStatusCode(EXIT_CODE_KILL);
+            return result;
+        }
         if (StringUtils.isEmpty(execCommand)) {
+            taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
             return result;
         }
 
@@ -155,14 +169,18 @@ public abstract class AbstractCommandExecutor {
         // parse process output
         parseProcessOutput(process);
 
-
         Integer processId = getProcessId(process);
 
         result.setProcessId(processId);
 
         // cache processId
         taskExecutionContext.setProcessId(processId);
-        taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+        boolean updateTaskExecutionContextStatus = taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext);
+        if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
+            ProcessUtils.kill(taskExecutionContext);
+            result.setExitStatusCode(EXIT_CODE_KILL);
+            return result;
+        }
 
         // print process id
         logger.info("process start, process id is: {}", processId);
@@ -173,11 +191,10 @@ public abstract class AbstractCommandExecutor {
         // waiting for the run to finish
         boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
 
-
         logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
-                taskExecutionContext.getExecutePath(),
-                processId
-                , result.getExitStatusCode());
+            taskExecutionContext.getExecutePath(),
+            processId
+            , result.getExitStatusCode());
 
         // if SHELL task exit
         if (status) {
@@ -189,7 +206,7 @@ public abstract class AbstractCommandExecutor {
             result.setExitStatusCode(process.exitValue());
 
             // if yarn task , yarn state is final state
-            if (process.exitValue() == 0){
+            if (process.exitValue() == 0) {
                 result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE);
             }
         } else {
@@ -198,7 +215,6 @@ public abstract class AbstractCommandExecutor {
             result.setExitStatusCode(EXIT_CODE_FAILURE);
         }
 
-
         return result;
     }
 
@@ -208,6 +224,7 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * cancel application
+     *
      * @throws Exception exception
      */
     public void cancelApplication() throws Exception {
@@ -238,6 +255,7 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * soft kill
+     *
      * @param processId process id
      * @return process is alive
      * @throws InterruptedException interrupted exception
@@ -262,6 +280,7 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * hard kill
+     *
      * @param processId process id
      */
     private void hardKill(int processId) {
@@ -280,6 +299,7 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * print command
+     *
      * @param commands process builder
      */
     private void printCommand(List<String> commands) {
@@ -311,12 +331,13 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * get the standard output of the process
+     *
      * @param process process
      */
     private void parseProcessOutput(Process process) {
         String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId());
         ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
-        parseProcessOutputExecutorService.submit(new Runnable(){
+        parseProcessOutputExecutorService.submit(new Runnable() {
             @Override
             public void run() {
                 BufferedReader inReader = null;
@@ -337,7 +358,7 @@ public abstract class AbstractCommandExecutor {
                         }
                     }
                 } catch (Exception e) {
-                    logger.error(e.getMessage(),e);
+                    logger.error(e.getMessage(), e);
                 } finally {
                     clear();
                     close(inReader);
@@ -357,22 +378,22 @@ public abstract class AbstractCommandExecutor {
         boolean result = true;
         try {
             for (String appId : appIds) {
-                while(Stopper.isRunning()){
+                while (Stopper.isRunning()) {
                     ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
-                    logger.info("appId:{}, final state:{}",appId,applicationStatus.name());
-                    if (applicationStatus.equals(ExecutionStatus.FAILURE) ||
-                            applicationStatus.equals(ExecutionStatus.KILL)) {
+                    logger.info("appId:{}, final state:{}", appId, applicationStatus.name());
+                    if (applicationStatus.equals(ExecutionStatus.FAILURE)
+                        || applicationStatus.equals(ExecutionStatus.KILL)) {
                         return false;
                     }
 
-                    if (applicationStatus.equals(ExecutionStatus.SUCCESS)){
+                    if (applicationStatus.equals(ExecutionStatus.SUCCESS)) {
                         break;
                     }
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                 }
             }
         } catch (Exception e) {
-            logger.error(String.format("yarn applications: %s  status failed ", appIds.toString()),e);
+            logger.error(String.format("yarn applications: %s  status failed ", appIds.toString()), e);
             result = false;
         }
         return result;
@@ -408,14 +429,15 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * convert file to list
+     *
      * @param filename file name
      * @return line list
      */
     private List<String> convertFile2List(String filename) {
         List lineList = new ArrayList<String>(100);
-        File file=new File(filename);
+        File file = new File(filename);
 
-        if (!file.exists()){
+        if (!file.exists()) {
             return lineList;
         }
 
@@ -427,13 +449,13 @@ public abstract class AbstractCommandExecutor {
                 lineList.add(line);
             }
         } catch (Exception e) {
-            logger.error(String.format("read file: %s failed : ",filename),e);
+            logger.error(String.format("read file: %s failed : ", filename), e);
         } finally {
-            if(br != null){
+            if (br != null) {
                 try {
                     br.close();
                 } catch (IOException e) {
-                    logger.error(e.getMessage(),e);
+                    logger.error(e.getMessage(), e);
                 }
             }
 
@@ -443,6 +465,7 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * find app id
+     *
      * @param line line
      * @return appid
      */
@@ -454,7 +477,6 @@ public abstract class AbstractCommandExecutor {
         return null;
     }
 
-
     /**
      * get remain time(s)
      *
@@ -495,7 +517,7 @@ public abstract class AbstractCommandExecutor {
     /**
      * when log buffer siz or flush time reach condition , then flush
      *
-     * @param lastFlushTime  last flush time
+     * @param lastFlushTime last flush time
      * @return last flush time
      */
     private long flush(long lastFlushTime) {
@@ -532,7 +554,10 @@ public abstract class AbstractCommandExecutor {
     protected List<String> commandOptions() {
         return Collections.emptyList();
     }
+
     protected abstract String buildCommandFilePath();
+
     protected abstract String commandInterpreter();
+
     protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
 }
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManagerTest.java
new file mode 100644
index 0000000..d871257
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManagerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.cache;
+
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * TaskExecutionContextCacheManagerTest
+ */
+public class TaskExecutionContextCacheManagerTest {
+
+    private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+    private TaskExecutionContext taskExecutionContext;
+
+    @Before
+    public void before() {
+        taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
+    }
+
+    @Test
+    public void testGetByTaskInstanceId() {
+        taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskInstanceId(2);
+        taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+        Assert.assertEquals(2, taskExecutionContextCacheManager.getByTaskInstanceId(2).getTaskInstanceId());
+    }
+
+    @Test
+    public void updateTaskExecutionContext() {
+        taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskInstanceId(1);
+        taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+        Assert.assertTrue(taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext));
+        taskExecutionContextCacheManager.removeByTaskInstanceId(1);
+        Assert.assertFalse(taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext));
+    }
+
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 2e00a16..5a5561d 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -14,9 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.worker.processor;
 
-import java.util.Date;
+package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -37,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseSer
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import org.apache.dolphinscheduler.server.zk.SpringZKServer;
@@ -44,10 +44,12 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import java.util.Date;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -56,26 +58,28 @@ import io.netty.channel.Channel;
 
 /**
  * test task call back service
+ * todo  refactor it in the form of mock
  */
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={
-        TaskCallbackServiceTestConfig.class,
-        SpringZKServer.class,
-        SpringApplicationContext.class,
-        MasterRegistry.class,
-        WorkerRegistry.class,
-        ZookeeperRegistryCenter.class,
-        MasterConfig.class,
-        WorkerConfig.class,
-        ZookeeperCachedOperator.class,
-        ZookeeperConfig.class,
-        ZookeeperNodeManager.class,
-        TaskCallbackService.class,
-        TaskResponseService.class,
-        TaskAckProcessor.class,
-        TaskResponseProcessor.class,
-        TaskExecuteProcessor.class,
-        CuratorZookeeperClient.class})
+@ContextConfiguration(classes = {
+    TaskCallbackServiceTestConfig.class,
+    SpringZKServer.class,
+    SpringApplicationContext.class,
+    MasterRegistry.class,
+    WorkerRegistry.class,
+    ZookeeperRegistryCenter.class,
+    MasterConfig.class,
+    WorkerConfig.class,
+    ZookeeperCachedOperator.class,
+    ZookeeperConfig.class,
+    ZookeeperNodeManager.class,
+    TaskCallbackService.class,
+    TaskResponseService.class,
+    TaskAckProcessor.class,
+    TaskResponseProcessor.class,
+    TaskExecuteProcessor.class,
+    CuratorZookeeperClient.class,
+    TaskExecutionContextCacheManagerImpl.class})
 public class TaskCallbackServiceTest {
 
     @Autowired
@@ -95,10 +99,11 @@ public class TaskCallbackServiceTest {
 
     /**
      * send ack test
+     *
      * @throws Exception
      */
     @Test
-    public void testSendAck() throws Exception{
+    public void testSendAck() throws Exception {
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
         NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
@@ -122,10 +127,11 @@ public class TaskCallbackServiceTest {
 
     /**
      * send result test
+     *
      * @throws Exception
      */
     @Test
-    public void testSendResult() throws Exception{
+    public void testSendResult() throws Exception {
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
         NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
@@ -136,7 +142,7 @@ public class TaskCallbackServiceTest {
         NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
         Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
         taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
-        TaskExecuteResponseCommand responseCommand  = new TaskExecuteResponseCommand();
+        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
         responseCommand.setTaskInstanceId(1);
         responseCommand.setEndTime(new Date());
 
@@ -152,20 +158,13 @@ public class TaskCallbackServiceTest {
         nettyRemotingClient.close();
     }
 
-//    @Test(expected = IllegalArgumentException.class)
-//    public void testSendAckWithIllegalArgumentException(){
-//        TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class);
-//        taskCallbackService.sendAck(1, ackCommand.convert2Command());
-//        Stopper.stop();
-//    }
-
     @Test
-    public void testPause(){
-        Assert.assertEquals(5000, taskCallbackService.pause(3));;
+    public void testPause() {
+        Assert.assertEquals(5000, taskCallbackService.pause(3));
     }
 
     @Test
-    public void testSendAck1(){
+    public void testSendAck1() {
         masterRegistry.registry();
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
@@ -177,7 +176,7 @@ public class TaskCallbackServiceTest {
         NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
         Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
         taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
-//        channel.close();
+        //        channel.close();
 
         TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
         ackCommand.setTaskInstanceId(1);
@@ -185,7 +184,7 @@ public class TaskCallbackServiceTest {
 
         taskCallbackService.sendAck(1, ackCommand.convert2Command());
 
-        Assert.assertEquals(true, channel.isOpen());
+        Assert.assertTrue(channel.isOpen());
 
         Stopper.stop();
 
@@ -195,7 +194,7 @@ public class TaskCallbackServiceTest {
     }
 
     @Test
-    public void testTaskExecuteProcessor() throws Exception{
+    public void testTaskExecuteProcessor() throws Exception {
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(30000);
         NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
@@ -207,11 +206,11 @@ public class TaskCallbackServiceTest {
 
         TaskExecuteRequestCommand taskExecuteRequestCommand = new TaskExecuteRequestCommand();
 
-        nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
+        nettyRemotingClient.send(new Host("localhost", 30000), taskExecuteRequestCommand.convert2Command());
 
         taskExecuteRequestCommand.setTaskExecutionContext(JSONUtils.toJsonString(new TaskExecutionContext()));
 
-        nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
+        nettyRemotingClient.send(new Host("localhost", 30000), taskExecuteRequestCommand.convert2Command());
 
         Thread.sleep(5000);
 
@@ -223,40 +222,4 @@ public class TaskCallbackServiceTest {
         nettyRemotingClient.close();
     }
 
-//    @Test(expected = IllegalStateException.class)
-//    public void testSendAckWithIllegalStateException2(){
-//        masterRegistry.registry();
-//        final NettyServerConfig serverConfig = new NettyServerConfig();
-//        serverConfig.setListenPort(30000);
-//        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
-//        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
-//        nettyRemotingServer.start();
-//
-//        final NettyClientConfig clientConfig = new NettyClientConfig();
-//        NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
-//        Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
-//        taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
-//        channel.close();
-//        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
-//        ackCommand.setTaskInstanceId(1);
-//        ackCommand.setStartTime(new Date());
-//
-//        nettyRemotingServer.close();
-//
-//        taskCallbackService.sendAck(1, ackCommand.convert2Command());
-//        try {
-//            Thread.sleep(5000);
-//        } catch (InterruptedException e) {
-//            e.printStackTrace();
-//        }
-//
-//        Stopper.stop();
-//
-//        try {
-//            Thread.sleep(5000);
-//        } catch (InterruptedException e) {
-//            e.printStackTrace();
-//        }
-//    }
-
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
new file mode 100644
index 0000000..36a758a
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.log.LogClientService;
+
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import io.netty.channel.Channel;
+
+/**
+ * TaskKillProcessorTest
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class, TaskKillProcessor.class, OSUtils.class, ProcessUtils.class, LoggerUtils.class})
+public class TaskKillProcessorTest {
+
+    private TaskKillProcessor taskKillProcessor;
+
+    private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
+
+    private Channel channel;
+
+    private Command command;
+
+    private TaskExecutionContext taskExecutionContext;
+
+    @Before
+    public void before() throws Exception {
+
+        TaskCallbackService taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
+        WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
+        taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class);
+
+        channel = PowerMockito.mock(Channel.class);
+        command = new Command();
+        command.setType(CommandType.TASK_KILL_REQUEST);
+        TaskKillRequestCommand taskKillRequestCommand = new TaskKillRequestCommand();
+        taskKillRequestCommand.setTaskInstanceId(1);
+        command.setBody(JSONUtils.toJsonString(taskKillRequestCommand).getBytes());
+        taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskInstanceId(1);
+        LogClientService logClient = PowerMockito.mock(LogClientService.class);
+
+        NettyRemoteChannel nettyRemoteChannel = PowerMockito.mock(NettyRemoteChannel.class);
+        PowerMockito.mockStatic(SpringApplicationContext.class);
+        PowerMockito.mockStatic(OSUtils.class);
+        PowerMockito.mockStatic(ProcessUtils.class);
+        PowerMockito.mockStatic(LoggerUtils.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)).thenReturn(taskCallbackService);
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager);
+        PowerMockito.doNothing().when(taskCallbackService).addRemoteChannel(anyInt(), any());
+        PowerMockito.whenNew(NettyRemoteChannel.class).withAnyArguments().thenReturn(null);
+        PowerMockito.when(OSUtils.exeCmd(any())).thenReturn(null);
+        PowerMockito.when(ProcessUtils.getPidsStr(102)).thenReturn("123");
+        PowerMockito.whenNew(LogClientService.class).withAnyArguments().thenReturn(logClient);
+        PowerMockito.when(logClient.viewLog(any(), anyInt(), any())).thenReturn("test");
+        PowerMockito.when(LoggerUtils.getAppIds(any(), any())).thenReturn(Collections.singletonList("id"));
+
+        Command viewLogResponseCommand = new Command();
+        viewLogResponseCommand.setBody("success".getBytes());
+
+        taskKillProcessor = new TaskKillProcessor();
+    }
+
+    @Test
+    public void testProcess() {
+
+        PowerMockito.when(taskExecutionContextCacheManager.getByTaskInstanceId(1)).thenReturn(taskExecutionContext);
+        taskKillProcessor.process(channel, command);
+
+        taskExecutionContext.setProcessId(101);
+        taskExecutionContext.setHost("127.0.0.1:22");
+        taskExecutionContext.setLogPath("/log");
+        taskExecutionContext.setExecutePath("/path");
+        taskExecutionContext.setTenantCode("ten");
+        taskKillProcessor.process(channel, command);
+    }
+
+}