You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/02/23 02:10:23 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement-#3735] Make task delayed execution more efficient (#4812)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ffcb1c2  [Improvement-#3735] Make task delayed execution more efficient (#4812)
ffcb1c2 is described below

commit ffcb1c22e16107e387ceb21dc5a0faaef0783ae7
Author: guohaozhang <zh...@gmail.com>
AuthorDate: Tue Feb 23 10:10:11 2021 +0800

    [Improvement-#3735] Make task delayed execution more efficient (#4812)
    
    * [Improvement-3735] improve implementation of delay task execution
    
    * [Improvement][worker] delay task compatible with dev branch and fix test
    
    
    Co-authored-by: vanilla111 <11...@qq.com>
---
 .../server/worker/WorkerServer.java                |   7 +
 .../worker/processor/TaskExecuteProcessor.java     |  48 ++---
 .../server/worker/processor/TaskKillProcessor.java |   8 +
 .../server/worker/runner/TaskExecuteThread.java    |  48 ++---
 .../server/worker/runner/WorkerManagerThread.java  | 143 +++++++++++++++
 .../worker/processor/TaskCallbackServiceTest.java  |   4 +-
 .../worker/processor/TaskExecuteProcessorTest.java | 195 +++++++++++++++++++++
 .../worker/runner/WorkerManagerThreadTest.java     | 187 ++++++++++++++++++++
 pom.xml                                            |   2 +
 9 files changed, 599 insertions(+), 43 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 1f138f6..a267b5b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
 import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
@@ -84,6 +85,9 @@ public class WorkerServer {
     @Autowired
     private RetryReportTaskStatusThread retryReportTaskStatusThread;
 
+    @Autowired
+    private WorkerManagerThread workerManagerThread;
+
     /**
      * worker server startup
      *
@@ -119,6 +123,9 @@ public class WorkerServer {
         // worker registry
         this.workerRegistry.registry();
 
+        // task execute manager
+        this.workerManagerThread.start();
+
         // retry report task status
         this.retryReportTaskStatusThread.start();
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index fc8b33a..3088080 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -38,12 +38,12 @@ import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.Date;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,11 +58,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
 
     /**
-     * thread executor service
-     */
-    private final ExecutorService workerExecService;
-
-    /**
      * worker config
      */
     private final WorkerConfig workerConfig;
@@ -73,20 +68,25 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     private final TaskCallbackService taskCallbackService;
 
     /**
-     *  alert client service
+     * alert client service
      */
     private AlertClientService alertClientService;
 
     /**
      * taskExecutionContextCacheManager
      */
-    private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+    private final TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+
+    /*
+     * task execute manager
+     */
+    private final WorkerManagerThread workerManager;
 
     public TaskExecuteProcessor() {
         this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
-        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
         this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
+        this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
     }
 
     /**
@@ -101,11 +101,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     }
 
     public TaskExecuteProcessor(AlertClientService alertClientService) {
-        this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
-        this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
-        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
-        this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
-
+        this();
         this.alertClientService = alertClientService;
     }
 
@@ -140,9 +136,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
             taskExecutionContext.getTaskInstanceId()));
 
         taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
-        taskExecutionContext.setStartTime(new Date());
         taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
-        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
 
         // local execute path
         String execLocalPath = getExecLocalPath(taskExecutionContext);
@@ -163,10 +157,23 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
             new NettyRemoteChannel(channel, command.getOpaque()));
 
+        // delay task process
+        long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
+        if (remainTime > 0) {
+            logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
+            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
+            taskExecutionContext.setStartTime(null);
+        } else {
+            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+            taskExecutionContext.setStartTime(new Date());
+        }
+
         this.doAck(taskExecutionContext);
 
-        // submit task
-        workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService));
+        // submit task to manager
+        if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))) {
+            logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize());
+        }
     }
 
     private void doAck(TaskExecutionContext taskExecutionContext) {
@@ -178,6 +185,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
     /**
      * build ack command
+     *
      * @param taskExecutionContext taskExecutionContext
      * @return TaskExecuteAckCommand
      */
@@ -209,4 +217,4 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
             taskExecutionContext.getProcessInstanceId(),
             taskExecutionContext.getTaskInstanceId());
     }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index b41c189..a3665b3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 
@@ -69,10 +70,16 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      */
     private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
 
+    /*
+     * task execute manager
+     */
+    private final WorkerManagerThread workerManager;
+
     public TaskKillProcessor() {
         this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
+        this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
     }
 
     /**
@@ -110,6 +117,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
             TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
             Integer processId = taskExecutionContext.getProcessId();
             if (processId.equals(0)) {
+                workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
                 taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
                 logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
                 return Pair.of(true, appIds);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 7216567..05bd806 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -51,7 +50,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -62,7 +63,7 @@ import com.github.rholder.retry.RetryException;
 /**
  *  task scheduler thread
  */
-public class TaskExecuteThread implements Runnable {
+public class TaskExecuteThread implements Runnable, Delayed {
 
     /**
      * logger
@@ -132,7 +133,6 @@ public class TaskExecuteThread implements Runnable {
             // task node
             TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
 
-            delayExecutionIfNeeded();
             if (taskExecutionContext.getStartTime() == null) {
                 taskExecutionContext.setStartTime(new Date());
             }
@@ -290,24 +290,6 @@ public class TaskExecuteThread implements Runnable {
     }
 
     /**
-     * delay execution if needed.
-     */
-    private void delayExecutionIfNeeded() {
-        long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
-                taskExecutionContext.getDelayTime() * 60L);
-        logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime);
-        if (remainTime > 0) {
-            try {
-                Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS);
-            } catch (Exception e) {
-                logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}",
-                            taskExecutionContext.getProcessInstanceId(),
-                            taskExecutionContext.getTaskInstanceId());
-            }
-        }
-    }
-
-    /**
      * send an ack to change the status of the task.
      */
     private void changeTaskExecutionStatusToRunning() {
@@ -343,4 +325,26 @@ public class TaskExecuteThread implements Runnable {
         }
         return ackCommand;
     }
-}
\ No newline at end of file
+
+    /**
+     * get current TaskExecutionContext
+     * @return TaskExecutionContext
+     */
+    public TaskExecutionContext getTaskExecutionContext() {
+        return this.taskExecutionContext;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+                taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+        if (o == null) {
+            return 1;
+        }
+        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
new file mode 100644
index 0000000..073c948
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * Manage tasks
+ */
+@Component
+public class WorkerManagerThread implements Runnable {
+
+    private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
+
+    /**
+     * task queue
+     */
+    private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
+
+    /**
+     * worker config
+     */
+    private final WorkerConfig workerConfig;
+
+    /**
+     * thread executor service
+     */
+    private final ExecutorService workerExecService;
+
+    /**
+     * taskExecutionContextCacheManager
+     */
+    private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+
+    /**
+     * task callback service
+     */
+    private final TaskCallbackService taskCallbackService;
+
+    public WorkerManagerThread() {
+        this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
+        this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
+        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads());
+        this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
+    }
+
+    /**
+     * get queue size
+     *
+     * @return queue size
+     */
+    public int getQueueSize() {
+        return workerExecuteQueue.size();
+    }
+
+    /**
+     * Kill tasks that have not been executed, like delay task
+     * then send Response to Master, update the execution status of task instance
+     */
+    public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
+        workerExecuteQueue.stream()
+                .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
+                .forEach(workerExecuteQueue::remove);
+        sendTaskKillResponse(taskInstanceId);
+    }
+
+    /**
+     * kill task before execute , like delay task
+     */
+    private void sendTaskKillResponse(Integer taskInstanceId) {
+        TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+        if (taskExecutionContext == null) {
+            return;
+        }
+        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
+        responseCommand.setStatus(ExecutionStatus.KILL.getCode());
+        ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
+        taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+    }
+
+    /**
+     * submit task
+     *
+     * @param taskExecuteThread taskExecuteThread
+     * @return submit result
+     */
+    public boolean offer(TaskExecuteThread taskExecuteThread) {
+        return workerExecuteQueue.offer(taskExecuteThread);
+    }
+
+    public void start() {
+        Thread thread = new Thread(this, this.getClass().getName());
+        thread.start();
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("Worker-Execute-Manager-Thread");
+        TaskExecuteThread taskExecuteThread;
+        while (Stopper.isRunning()) {
+            try {
+                taskExecuteThread = workerExecuteQueue.take();
+                workerExecService.submit(taskExecuteThread);
+            } catch (Exception e) {
+                logger.error("An unexpected interrupt is happened, "
+                        + "the exception will be ignored and this thread will continue to run", e);
+            }
+        }
+    }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 5a5561d..bdd723a 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
@@ -79,7 +80,8 @@ import io.netty.channel.Channel;
     TaskResponseProcessor.class,
     TaskExecuteProcessor.class,
     CuratorZookeeperClient.class,
-    TaskExecutionContextCacheManagerImpl.class})
+    TaskExecutionContextCacheManagerImpl.class,
+    WorkerManagerThread.class})
 public class TaskCallbackServiceTest {
 
     @Autowired
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
new file mode 100644
index 0000000..fef2151
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Date;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * test task execute processor
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
+        JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+public class TaskExecuteProcessorTest {
+
+    private TaskExecutionContext taskExecutionContext;
+
+    private TaskCallbackService taskCallbackService;
+
+    private ExecutorService workerExecService;
+
+    private WorkerConfig workerConfig;
+
+    private Command command;
+
+    private Command ackCommand;
+
+    private TaskExecuteRequestCommand taskRequestCommand;
+
+    private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
+
+    private AlertClientService alertClientService;
+
+    private WorkerManagerThread workerManager;
+
+    @Before
+    public void before() throws Exception {
+        // init task execution context
+        taskExecutionContext = getTaskExecutionContext();
+        workerConfig = new WorkerConfig();
+        workerConfig.setWorkerExecThreads(1);
+        workerConfig.setListenPort(1234);
+        command = new Command();
+        command.setType(CommandType.TASK_EXECUTE_REQUEST);
+        ackCommand = new TaskExecuteAckCommand().convert2Command();
+        taskRequestCommand = new TaskExecuteRequestCommand();
+        alertClientService = PowerMockito.mock(AlertClientService.class);
+        workerExecService = PowerMockito.mock(ExecutorService.class);
+        PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
+                .thenReturn(null);
+
+        PowerMockito.mockStatic(ChannelUtils.class);
+        PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
+
+        taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class);
+        taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
+        PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+
+        PowerMockito.mockStatic(SpringApplicationContext.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
+                .thenReturn(taskCallbackService);
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
+                .thenReturn(workerConfig);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
+                .thenReturn(null);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
+                .thenReturn(taskExecutionContextCacheManager);
+
+        Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+                taskExecutionContext.getProcessDefineId(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId()));
+
+        workerManager = PowerMockito.mock(WorkerManagerThread.class);
+        PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))).thenReturn(Boolean.TRUE);
+
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
+                .thenReturn(workerManager);
+
+        PowerMockito.mockStatic(ThreadUtils.class);
+        PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()))
+                .thenReturn(workerExecService);
+
+        PowerMockito.mockStatic(JsonSerializer.class);
+        PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
+                .thenReturn(taskRequestCommand);
+
+        PowerMockito.mockStatic(JSONUtils.class);
+        PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
+                .thenReturn(taskRequestCommand);
+        PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
+                .thenReturn(taskExecutionContext);
+
+        PowerMockito.mockStatic(FileUtils.class);
+        PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
+                taskExecutionContext.getProcessDefineId(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId()))
+                .thenReturn(taskExecutionContext.getExecutePath());
+        PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
+
+        SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService);
+        PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
+                .thenReturn(simpleTaskExecuteThread);
+    }
+
+    @Test
+    public void testNormalExecution() {
+        TaskExecuteProcessor processor = new TaskExecuteProcessor();
+        processor.process(null, command);
+
+        Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
+    }
+
+    @Test
+    public void testDelayExecution() {
+        taskExecutionContext.setDelayTime(1);
+        TaskExecuteProcessor processor = new TaskExecuteProcessor();
+        processor.process(null, command);
+
+        Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
+    }
+
+    public TaskExecutionContext getTaskExecutionContext() {
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setProcessId(12345);
+        taskExecutionContext.setProcessDefineId(1);
+        taskExecutionContext.setProcessInstanceId(1);
+        taskExecutionContext.setTaskInstanceId(1);
+        taskExecutionContext.setTaskType("sql");
+        taskExecutionContext.setFirstSubmitTime(new Date());
+        taskExecutionContext.setDelayTime(0);
+        taskExecutionContext.setLogPath("/tmp/test.log");
+        taskExecutionContext.setHost("localhost");
+        taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
+        return taskExecutionContext;
+    }
+
+    private static class SimpleTaskExecuteThread extends TaskExecuteThread {
+
+        public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) {
+            super(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
+        }
+
+        @Override
+        public void run() {
+            //
+        }
+    }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
new file mode 100644
index 0000000..c6b0493
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.dolphinscheduler.server.worker.task.TaskManager;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * test worker manager thread.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+        Stopper.class,
+        TaskManager.class,
+        JSONUtils.class,
+        CommonUtils.class,
+        SpringApplicationContext.class,
+        OSUtils.class})
+public class WorkerManagerThreadTest {
+
+    private TaskCallbackService taskCallbackService;
+
+    private WorkerManagerThread workerManager;
+
+    private TaskExecutionContext taskExecutionContext;
+
+    private AlertClientService alertClientService;
+
+    private Logger taskLogger;
+
+    @Before
+    public void before() {
+        // init task execution context, logger
+        taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setProcessId(12345);
+        taskExecutionContext.setProcessDefineId(1);
+        taskExecutionContext.setProcessInstanceId(1);
+        taskExecutionContext.setTaskInstanceId(1);
+        taskExecutionContext.setTenantCode("test");
+        taskExecutionContext.setTaskType("");
+        taskExecutionContext.setFirstSubmitTime(new Date());
+        taskExecutionContext.setDelayTime(0);
+        taskExecutionContext.setLogPath("/tmp/test.log");
+        taskExecutionContext.setHost("localhost");
+        taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
+
+        Command ackCommand = new TaskExecuteAckCommand().convert2Command();
+        Command responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command();
+
+        taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
+                LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+                taskExecutionContext.getProcessDefineId(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId()
+        ));
+
+        TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
+        taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+
+        alertClientService = PowerMockito.mock(AlertClientService.class);
+        WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
+        taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
+        PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+        PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
+        PowerMockito.mockStatic(SpringApplicationContext.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
+                .thenReturn(taskExecutionContextCacheManager);
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
+                .thenReturn(workerConfig);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
+                .thenReturn(taskCallbackService);
+        PowerMockito.when(workerConfig.getWorkerExecThreads()).thenReturn(5);
+        workerManager = new WorkerManagerThread();
+
+        PowerMockito.mockStatic(TaskManager.class);
+        PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService))
+                .thenReturn(new SimpleTask(taskExecutionContext, taskLogger));
+        PowerMockito.mockStatic(JSONUtils.class);
+        PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class))
+                .thenReturn(new TaskNode());
+        PowerMockito.mockStatic(CommonUtils.class);
+        PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile");
+        List<String> osUserList = Collections.singletonList("test");
+        PowerMockito.mockStatic(OSUtils.class);
+        PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList);
+        PowerMockito.mockStatic(Stopper.class);
+        PowerMockito.when(Stopper.isRunning()).thenReturn(true, false);
+    }
+
+    @Test
+    public void testSendTaskKillResponse() {
+        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
+        workerManager.offer(taskExecuteThread);
+        Assert.assertEquals(1, workerManager.getQueueSize());
+        workerManager.killTaskBeforeExecuteByInstanceId(1);
+        Assert.assertEquals(0, workerManager.getQueueSize());
+    }
+
+    @Test
+    public void testRun() {
+        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
+        workerManager.offer(taskExecuteThread);
+        Assert.assertEquals(1, workerManager.getQueueSize());
+        workerManager.run();
+        Assert.assertEquals(0, workerManager.getQueueSize());
+    }
+
+    private static class SimpleTask extends AbstractTask {
+
+        protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) {
+            super(taskExecutionContext, logger);
+            // pid
+            this.processId = taskExecutionContext.getProcessId();
+        }
+
+        @Override
+        public AbstractParameters getParameters() {
+            return null;
+        }
+
+        @Override
+        public void init() {
+
+        }
+
+        @Override
+        public void handle() {
+
+        }
+
+        @Override
+        public void after() {
+
+        }
+
+        @Override
+        public ExecutionStatus getExitStatus() {
+            return ExecutionStatus.SUCCESS;
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index c0974a5..c6880c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -913,6 +913,7 @@
                         <include>**/server/utils/ProcessUtilsTest.java</include>
                         <include>**/server/utils/SparkArgsUtilsTest.java</include>
                         <include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
+                        <include>**/server/worker/processor/TaskExecuteProcessorTest.java</include>
                         <include>**/server/worker/registry/WorkerRegistryTest.java</include>
                         <include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
                         <include>**/server/worker/sql/SqlExecutorTest.java</include>
@@ -926,6 +927,7 @@
                         <include>**/server/worker/task/TaskManagerTest.java</include>
                         <include>**/server/worker/EnvFileTest.java</include>
                         <include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
+                        <include>**/server/worker/runner/WorkerManagerThreadTest.java</include>
                         <include>**/service/quartz/cron/CronUtilsTest.java</include>
                         <include>**/service/process/ProcessServiceTest.java</include>
                         <include>**/service/zk/DefaultEnsembleProviderTest.java</include>